阿里巴巴网站图片如何做白,免费ftp网站,濮阳做网站公司,潍坊网站制作熊掌号iomanager IO协程调度模块
以下是从sylar服务器中学的#xff0c;对其的复习#xff1b; 参考资料
继承自协程调度器#xff0c;封装了epoll#xff0c;支持为socket fd注册读写事件回调函数
IO协程调度还解决了调度器在idle状态下忙等待导致CPU占用率高的问题。IO协程调…iomanager IO协程调度模块
以下是从sylar服务器中学的对其的复习 参考资料
继承自协程调度器封装了epoll支持为socket fd注册读写事件回调函数
IO协程调度还解决了调度器在idle状态下忙等待导致CPU占用率高的问题。IO协程调度器使用一对管道fd来tickle调度协程当调度器空闲时idle协程通过epoll_wait阻塞在管道的读描述符上等管道的可读事件。添加新任务时tickle方法写管道idle协程检测到管道可读后退出调度器执行调度
学习本章内容之前必须epoll接口要非常熟悉,参考链接
IO协程调度概述
IO协程调度可以看成是增强版的协程调度。
在前面的协程调度模块中调度器对协程的调度是无条件执行的在调度器已经启动调度的情况下任务一旦添加成功就会排队等待调度器执行。调度器不支持删除调度任务并且调度器在正常退出之前一定会执行完全部的调度任务所以在某种程度上可以认为把一个协程添加到调度器的任务队列就相当于调用了协程的resume方法。
IO协程调度支持协程调度的全部功能因为IO协程调度器是直接继承协程调度器实现的。除了协程调度IO协程调度还增加了IO事件调度的功能这个功能是针对描述符一般是套接字描述符的。IO协程调度支持为描述符注册可读和可写事件的回调函数当描述符可读或可写时执行对应的回调函数。这里可以直接把回调函数等效成协程所以这个功能被称为IO协程调度
IO事件调度功能对服务器开发至关重要因为服务器通常需要处理大量来自客户端的socket fd使用IO事件调度可以将开发者从判断socket fd是否可读或可写的工作中解放出来使得程序员只需要关心socket fd的IO操作。后续的socket api hook模块也依赖IO协程调度。
sylar IO协程调度模块设计
sylar的IO协程调度模块基于epoll实现只支持Linux平台。对每个fdsylar支持两类事件一类是可读事件对应EPOLLIN一类是可写事件对应EPOLLOUTsylar的事件枚举值直接继承自epoll。
当然epoll本身除了支持了EPOLLIN和EPOLLOUT两类事件外还支持其他事件比如EPOLLRDHUP, EPOLLERR, EPOLLHUP等对于这些事件sylar的做法是将其进行归类分别对应到EPOLLIN和EPOLLOUT中也就是所有的事件都可以表示为可读或可写事件甚至有的事件还可以同时表示可读及可写事件比如EPOLLERR事件发生时fd将同时触发可读和可写事件。
对于IO协程调度来说每次调度都包含一个三元组信息分别是描述符-事件类型可读或可写-回调函数调度器记录全部需要调度的三元组信息其中描述符和事件类型用于epoll_wait回调函数用于协程调度。这个三元组信息在源码上通过FdContext结构体来存储在执行epoll_wait时通过epoll_event的私有数据指针data.ptr来保存FdContext结构体信息。
IO协程调度器在idle时会epoll_wait所有注册的fd如果有fd满足条件epoll_wait返回从私有数据中拿到fd的上下文信息并且执行其中的回调函数。实际是idle协程只负责收集所有已触发的fd的回调函数并将其加入调度器的任务队列真正的执行时机是idle协程退出后调度器在下一轮调度时执行
与协程调度器不一样的是IO协程调度器支持取消事件。取消事件表示不关心某个fd的某个事件了如果某个fd的可读或可写事件都被取消了那这个fd会从调度器的epoll_wait中删除。
实现
sylar的IO协程调度器对应IOManager这个类直接继承自Scheduler
class IOManager : public Scheduler {
public:typedef std::shared_ptrIOManager ptr;typedef RWMutex RWMutexType;
...
}读写事件的定义这里直接继承epoll的枚举值
/*** brief IO事件继承自epoll对事件的定义* details 这里只关心socket fd的读和写事件其他epoll事件会归类到这两类事件中*/
enum Event {/// 无事件NONE 0x0,/// 读事件(EPOLLIN)READ 0x1,/// 写事件(EPOLLOUT)WRITE 0x4,
};对描述符-事件类型-回调函数三元组的定义这个三元组也称为fd上下文使用结构体FdContext来表示。由于fd有可读和可写两种事件每种事件的回调函数也可以不一样所以每个fd都需要保存两个事件类型-回调函数组合。FdContext结构体定义如下
/*** brief socket fd上下文类* details 每个socket fd都对应一个FdContext包括fd的值fd上的事件以及fd的读写事件上下文*/
struct FdContext {typedef Mutex MutexType;/*** brief 事件上下文类* details fd的每个事件都有一个事件上下文保存这个事件的回调函数以及执行回调函数的调度器* sylar对fd事件做了简化只预留了读事件和写事件所有的事件都被归类到这两类事件中*/struct EventContext {/// 执行事件回调的调度器Scheduler *scheduler nullptr;/// 事件回调协程Fiber::ptr fiber;/// 事件回调函数std::functionvoid() cb;};/*** brief 获取事件上下文类* param[in] event 事件类型* return 返回对应事件的上下文*/EventContext getEventContext(Event event);/*** brief 重置事件上下文* param[in, out] ctx 待重置的事件上下文对象*/void resetEventContext(EventContext ctx);/*** brief 触发事件* details 根据事件类型调用对应上下文结构中的调度器去调度回调协程或回调函数* param[in] event 事件类型*/void triggerEvent(Event event);/// 读事件上下文EventContext read;/// 写事件上下文EventContext write;/// 事件关联的句柄int fd 0;/// 该fd添加了哪些事件的回调函数或者说该fd关心哪些事件Event events NONE;/// 事件的MutexMutexType mutex;
};IOManager的成员变量。IOManager包含一个epoll实例的句柄m_epfd以及用于tickle的一对pipe fd还有全部的fd上下文m_fdContexts
/// epoll 文件句柄
int m_epfd 0;
/// pipe 文件句柄fd[0]读端fd[1]写端
int m_tickleFds[2];
/// 当前等待执行的IO事件数量
std::atomicsize_t m_pendingEventCount {0};
/// IOManager的Mutex
RWMutexType m_mutex;
/// socket事件上下文的容器
std::vectorFdContext * m_fdContexts;在继承类IOManager中改造协程调度器使其支持epoll并重载tickle和idle实现通知调度协程和IO协程调度功能 /*** brief 构造函数* param[in] threads 线程数量* param[in] use_caller 是否将调用线程包含进去* param[in] name 调度器的名称*/
IOManager::IOManager(size_t threads, bool use_caller, const std::string name): Scheduler(threads, use_caller, name) {// 创建epoll实例m_epfd epoll_create(5000);SYLAR_ASSERT(m_epfd 0);// 创建pipe获取m_tickleFds[2]其中m_tickleFds[0]是管道的读端m_tickleFds[1]是管道的写端int rt pipe(m_tickleFds);SYLAR_ASSERT(!rt);// 注册pipe读句柄的可读事件用于tickle调度协程通过epoll_event.data.fd保存描述符epoll_event event;memset(event, 0, sizeof(epoll_event));event.events EPOLLIN | EPOLLET;event.data.fd m_tickleFds[0];// 非阻塞方式配合边缘触发rt fcntl(m_tickleFds[0], F_SETFL, O_NONBLOCK);SYLAR_ASSERT(!rt);// 将管道的读描述符加入epoll多路复用如果管道可读idle中的epoll_wait会返回rt epoll_ctl(m_epfd, EPOLL_CTL_ADD, m_tickleFds[0], event);SYLAR_ASSERT(!rt);contextResize(32);// 这里直接开启了Schedluer也就是说IOManager创建即可调度协程start();
}/*** brief 通知调度器有任务要调度* details 写pipe让idle协程从epoll_wait退出待idle协程yield之后Scheduler::run就可以调度其他任务* 如果当前没有空闲调度线程那就没必要发通知*/
void IOManager::tickle() {SYLAR_LOG_DEBUG(g_logger) tickle;if(!hasIdleThreads()) {return;}int rt write(m_tickleFds[1], T, 1);SYLAR_ASSERT(rt 1);
}/*** brief idle协程* details 对于IO协程调度来说应阻塞在等待IO事件上idle退出的时机是epoll_wait返回对应的操作是tickle或注册的IO事件就绪* 调度器无调度任务时会阻塞idle协程上对IO调度器而言idle状态应该关注两件事一是有没有新的调度任务对应Schduler::schedule()* 如果有新的调度任务那应该立即退出idle状态并执行对应的任务二是关注当前注册的所有IO事件有没有触发如果有触发那么应该执行* IO事件对应的回调函数*/
void IOManager::idle() {SYLAR_LOG_DEBUG(g_logger) idle;// 一次epoll_wait最多检测256个就绪事件如果就绪事件超过了这个数那么会在下轮epoll_wati继续处理const uint64_t MAX_EVNETS 256;epoll_event *events new epoll_event[MAX_EVNETS]();std::shared_ptrepoll_event shared_events(events, [](epoll_event *ptr) {delete[] ptr;});while (true) {if(stopping()) {SYLAR_LOG_DEBUG(g_logger) name getName() idle stopping exit;break;}// 阻塞在epoll_wait上等待事件发生static const int MAX_TIMEOUT 5000;int rt epoll_wait(m_epfd, events, MAX_EVNETS, MAX_TIMEOUT);if(rt 0) {if(errno EINTR) {continue;}SYLAR_LOG_ERROR(g_logger) epoll_wait( m_epfd ) (rt rt ) (errno errno ) (errstr: strerror(errno) );break;}// 遍历所有发生的事件根据epoll_event的私有指针找到对应的FdContext进行事件处理for (int i 0; i rt; i) {epoll_event event events[i];if (event.data.fd m_tickleFds[0]) {// ticklefd[0]用于通知协程调度这时只需要把管道里的内容读完即可本轮idle结束Scheduler::run会重新执行协程调度uint8_t dummy[256];while (read(m_tickleFds[0], dummy, sizeof(dummy)) 0);continue;}// 通过epoll_event的私有指针获取FdContextFdContext *fd_ctx (FdContext *)event.data.ptr;FdContext::MutexType::Lock lock(fd_ctx-mutex);/*** EPOLLERR: 出错比如写读端已经关闭的pipe* EPOLLHUP: 套接字对端关闭* 出现这两种事件应该同时触发fd的读和写事件否则有可能出现注册的事件永远执行不到的情况*/if (event.events (EPOLLERR | EPOLLHUP)) {event.events | (EPOLLIN | EPOLLOUT) fd_ctx-events;}int real_events NONE;if (event.events EPOLLIN) {real_events | READ;}if (event.events EPOLLOUT) {real_events | WRITE;}if ((fd_ctx-events real_events) NONE) {continue;}// 剔除已经发生的事件将剩下的事件重新加入epoll_wait// 如果剩下的事件为0表示这个fd已经不需要关注了直接从epoll中删除int left_events (fd_ctx-events ~real_events);int op left_events ? EPOLL_CTL_MOD : EPOLL_CTL_DEL;event.events EPOLLET | left_events;int rt2 epoll_ctl(m_epfd, op, fd_ctx-fd, event);if (rt2) {SYLAR_LOG_ERROR(g_logger) epoll_ctl( m_epfd , (EpollCtlOp)op , fd_ctx-fd , (EPOLL_EVENTS)event.events ): rt2 ( errno ) ( strerror(errno) );continue;}// 处理已经发生的事件也就是让调度器调度指定的函数或协程if (real_events READ) {fd_ctx-triggerEvent(READ);--m_pendingEventCount;}if (real_events WRITE) {fd_ctx-triggerEvent(WRITE);--m_pendingEventCount;}} // end for/*** 一旦处理完所有的事件idle协程yield这样可以让调度协程(Scheduler::run)重新检查是否有新任务要调度* 上面triggerEvent实际也只是把对应的fiber重新加入调度要执行的话还要等idle协程退出*/Fiber::ptr cur Fiber::GetThis();auto raw_ptr cur.get();cur.reset();raw_ptr-yield();} // end while(true)
}注册事件回调addEvent删除事件回调delEvent取消事件回调cancelEvent以及取消全部事件cancelAll
/*** brief 添加事件* details fd描述符发生了event事件时执行cb函数* param[in] fd socket句柄* param[in] event 事件类型* param[in] cb 事件回调函数如果为空则默认把当前协程作为回调执行体* return 添加成功返回0,失败返回-1*/
int IOManager::addEvent(int fd, Event event, std::functionvoid() cb) {// 找到fd对应的FdContext如果不存在那就分配一个FdContext *fd_ctx nullptr;RWMutexType::ReadLock lock(m_mutex);if ((int)m_fdContexts.size() fd) {fd_ctx m_fdContexts[fd];lock.unlock();} else {lock.unlock();RWMutexType::WriteLock lock2(m_mutex);contextResize(fd * 1.5);fd_ctx m_fdContexts[fd];}// 同一个fd不允许重复添加相同的事件FdContext::MutexType::Lock lock2(fd_ctx-mutex);if (SYLAR_UNLIKELY(fd_ctx-events event)) {SYLAR_LOG_ERROR(g_logger) addEvent assert fd fd event (EPOLL_EVENTS)event fd_ctx.event (EPOLL_EVENTS)fd_ctx-events;SYLAR_ASSERT(!(fd_ctx-events event));}// 将新的事件加入epoll_wait使用epoll_event的私有指针存储FdContext的位置int op fd_ctx-events ? EPOLL_CTL_MOD : EPOLL_CTL_ADD;epoll_event epevent;epevent.events EPOLLET | fd_ctx-events | event;epevent.data.ptr fd_ctx;int rt epoll_ctl(m_epfd, op, fd, epevent);if (rt) {SYLAR_LOG_ERROR(g_logger) epoll_ctl( m_epfd , (EpollCtlOp)op , fd , (EPOLL_EVENTS)epevent.events ): rt ( errno ) ( strerror(errno) ) fd_ctx-events (EPOLL_EVENTS)fd_ctx-events;return -1;}// 待执行IO事件数加1m_pendingEventCount;// 找到这个fd的event事件对应的EventContext对其中的scheduler, cb, fiber进行赋值fd_ctx-events (Event)(fd_ctx-events | event);FdContext::EventContext event_ctx fd_ctx-getEventContext(event);SYLAR_ASSERT(!event_ctx.scheduler !event_ctx.fiber !event_ctx.cb);// 赋值scheduler和回调函数如果回调函数为空则把当前协程当成回调执行体event_ctx.scheduler Scheduler::GetThis();if (cb) {event_ctx.cb.swap(cb);} else {event_ctx.fiber Fiber::GetThis();SYLAR_ASSERT2(event_ctx.fiber-getState() Fiber::RUNNING, state event_ctx.fiber-getState());}return 0;
}
/*** brief 删除事件* param[in] fd socket句柄* param[in] event 事件类型* attention 不会触发事件* return 是否删除成功*/
bool IOManager::delEvent(int fd, Event event) {// 找到fd对应的FdContextRWMutexType::ReadLock lock(m_mutex);if ((int)m_fdContexts.size() fd) {return false;}FdContext *fd_ctx m_fdContexts[fd];lock.unlock();FdContext::MutexType::Lock lock2(fd_ctx-mutex);if (SYLAR_UNLIKELY(!(fd_ctx-events event))) {return false;}// 清除指定的事件表示不关心这个事件了如果清除之后结果为0则从epoll_wait中删除该文件描述符Event new_events (Event)(fd_ctx-events ~event);int op new_events ? EPOLL_CTL_MOD : EPOLL_CTL_DEL;epoll_event epevent;epevent.events EPOLLET | new_events;epevent.data.ptr fd_ctx;int rt epoll_ctl(m_epfd, op, fd, epevent);if (rt) {SYLAR_LOG_ERROR(g_logger) epoll_ctl( m_epfd , (EpollCtlOp)op , fd , (EPOLL_EVENTS)epevent.events ): rt ( errno ) ( strerror(errno) );return false;}// 待执行事件数减1--m_pendingEventCount;// 重置该fd对应的event事件上下文fd_ctx-events new_events;FdContext::EventContext event_ctx fd_ctx-getEventContext(event);fd_ctx-resetEventContext(event_ctx);return true;
}/*** brief 取消事件* param[in] fd socket句柄* param[in] event 事件类型* attention 如果该事件被注册过回调那就触发一次回调事件* return 是否删除成功*/
bool IOManager::cancelEvent(int fd, Event event) {// 找到fd对应的FdContextRWMutexType::ReadLock lock(m_mutex);if ((int)m_fdContexts.size() fd) {return false;}FdContext *fd_ctx m_fdContexts[fd];lock.unlock();FdContext::MutexType::Lock lock2(fd_ctx-mutex);if (SYLAR_UNLIKELY(!(fd_ctx-events event))) {return false;}// 删除事件Event new_events (Event)(fd_ctx-events ~event);int op new_events ? EPOLL_CTL_MOD : EPOLL_CTL_DEL;epoll_event epevent;epevent.events EPOLLET | new_events;epevent.data.ptr fd_ctx;int rt epoll_ctl(m_epfd, op, fd, epevent);if (rt) {SYLAR_LOG_ERROR(g_logger) epoll_ctl( m_epfd , (EpollCtlOp)op , fd , (EPOLL_EVENTS)epevent.events ): rt ( errno ) ( strerror(errno) );return false;}// 删除之前触发一次事件fd_ctx-triggerEvent(event);// 活跃事件数减1--m_pendingEventCount;return true;
}/*** brief 取消所有事件* details 所有被注册的回调事件在cancel之前都会被执行一次* param[in] fd socket句柄* return 是否删除成功*/
bool IOManager::cancelAll(int fd) {// 找到fd对应的FdContextRWMutexType::ReadLock lock(m_mutex);if ((int)m_fdContexts.size() fd) {return false;}FdContext *fd_ctx m_fdContexts[fd];lock.unlock();FdContext::MutexType::Lock lock2(fd_ctx-mutex);if (!fd_ctx-events) {return false;}// 删除全部事件int op EPOLL_CTL_DEL;epoll_event epevent;epevent.events 0;epevent.data.ptr fd_ctx;int rt epoll_ctl(m_epfd, op, fd, epevent);if (rt) {SYLAR_LOG_ERROR(g_logger) epoll_ctl( m_epfd , (EpollCtlOp)op , fd , (EPOLL_EVENTS)epevent.events ): rt ( errno ) ( strerror(errno) );return false;}// 触发全部已注册的事件if (fd_ctx-events READ) {fd_ctx-triggerEvent(READ);--m_pendingEventCount;}if (fd_ctx-events WRITE) {fd_ctx-triggerEvent(WRITE);--m_pendingEventCount;}SYLAR_ASSERT(fd_ctx-events 0);return true;
}IOManager的析构函数实现和stopping重载。对于IOManager的析构首先要等Scheduler调度完所有的任务然后再关闭epoll句柄和pipe句柄然后释放所有的FdContext对于stoppingIOManager在判断是否可退出时还要加上所有IO事件都完成调度的条件
IOManager::~IOManager() {stop();close(m_epfd);close(m_tickleFds[0]);close(m_tickleFds[1]);for (size_t i 0; i m_fdContexts.size(); i) {if (m_fdContexts[i]) {delete m_fdContexts[i];}}
}
bool IOManager::stopping() {// 对于IOManager而言必须等所有待调度的IO事件都执行完了才可以退出return m_pendingEventCount 0 Scheduler::stopping();
}