移动宽带 怎么建设网站,网站做营销推广的公司,wordpress cms系统,做目录的网站首发原文链接#xff1a;Swoole 源码分析之 Coroutine 协程模块 大家好#xff0c;我是码农先森。
引言
协程又称轻量级线程#xff0c;但与线程不同的是#xff1b;协程是用户级线程#xff0c;不需要操作系统参与。由用户显式控制#xff0c;可以在需要的时候挂起、或…首发原文链接Swoole 源码分析之 Coroutine 协程模块 大家好我是码农先森。
引言
协程又称轻量级线程但与线程不同的是协程是用户级线程不需要操作系统参与。由用户显式控制可以在需要的时候挂起、或恢复执行。
通过协程程序可以在执行的过程中保存当前的状态并在恢复后从该状态处继续执行整体上来说创建、销毁、切换的成本低。
但在 Swoole 中的协程是无法利用多核 CPU 的如果想利用多核 CPU 则需要依赖 Swoole 的多进程模型。
协程的出现为 Swoole 程序提升并发效率、及系统的处理能力注入了强劲的动力可以说是 Swoole 作为高性能通信框架的的核心模块。
源码拆解
这次我们以下面这段代码来作为本次拆解源码的切入点。
// 协程容器
Swoole\Coroutine\run(function () {// Socket 协程客户端$socket new Swoole\Coroutine\Socket(AF_INET, SOCK_STREAM, 0);// 建立连接在建立连接的过程中会发生协程切换$retval $socket-connect(127.0.0.1, 9601);if ($retval) {// 发送数据在发送数据的过程中会发生协程切换$n $socket-send(hello);var_dump($n);// 解释数据在接收数据的过程中会发生协程切换$data $socket-recv();var_dump($data);// 关闭连接$socket-close();}
});这段代码主要是使用 Socket 的协程客户端与本地的 9601 端口建立连接并且发送、接收数据。在分析源码之前我对这次的源码做了一个图解梳理把整个调用链路上的函数串联了起来。我们可以先对整体有个大致的了解便于后面分析源代码。
Socket 协程客户端
Socket 协程客户端是专门用于 Swoole 在协程环境中使用的可以实现在 IO 调用时切换协程让出 CPU 的使用权。例如在连接建立、发送数据、接收数据 等阶段会进行协程的切换。
这个函数主要是发起 Socket 连接的建立并且在 wait_event 这个函数内部实现了协程的切换。
// swoole-src/src/coroutine/socket.cc:595
bool Socket::connect(const struct sockaddr *addr, socklen_t addrlen) {if (sw_unlikely(!is_available(SW_EVENT_RDWR))) {return false;}int retval;do {// 发起连接建立retval ::connect(sock_fd, addr, addrlen);} while (retval 0 errno EINTR);if (retval 0) {if (errno ! EINPROGRESS) {set_err(errno);return false;} else {TimerController timer(write_timer, connect_timeout, this, timer_callback);// wait_event 这个函数内部实现了协程的切换if (!timer.start() || !wait_event(SW_EVENT_WRITE)) {if (is_closed()) {set_err(ECONNABORTED);}return false;} else {if (socket-get_option(SOL_SOCKET, SO_ERROR, errCode) 0 || errCode ! 0) {set_err(errCode);return false;}}}}connected true;set_err(0);return true;
}再看看 wait_event 函数的内部实现先是获取到当前的协程然后根据事件的类型调用函数 add_event 将事件添加到事件管理的结构体中最后将当前的协程切换出去让出其 CPU 的控制权。
// swoole-src/src/coroutine/socket.cc:147
bool Socket::wait_event(const EventType event, const void **__buf, size_t __n) {EventType added_event event;// 获取到当前的协程Coroutine *co Coroutine::get_current_safe();if (!co) {return false;}if (sw_unlikely(socket-close_wait)) {set_err(SW_ERROR_CO_SOCKET_CLOSE_WAIT);return false;}// clear the last errCodeset_err(0);
#ifdef SW_USE_OPENSSL// 根据事件的类型调用函数 add_event 将事件添加到事件管理的结构体中if (sw_unlikely(socket-ssl ((event SW_EVENT_READ socket-ssl_want_write) ||(event SW_EVENT_WRITE socket-ssl_want_read)))) {if (sw_likely(socket-ssl_want_write add_event(SW_EVENT_WRITE))) {want_event SW_EVENT_WRITE;} else if (socket-ssl_want_read add_event(SW_EVENT_READ)) {want_event SW_EVENT_READ;} else {return false;}added_event want_event;} else
#endifif (sw_unlikely(!add_event(event))) {return false;}swoole_trace_log(SW_TRACE_SOCKET,socket#%d blongs to cid#%ld is waiting for %s event,sock_fd,co-get_cid(),get_wait_event_name(this, event));Coroutine::CancelFunc cancel_fn [this, event](Coroutine *co) { return cancel(event); };// 将当前的协程切换出去让出其 CPU 的控制权if (sw_likely(event SW_EVENT_READ)) {read_co co;read_co-yield(cancel_fn);read_co nullptr;} else if (event SW_EVENT_WRITE) {if (sw_unlikely(!zero_copy __n 0 *__buf ! get_write_buffer()-str)) {write_buffer-clear();if (write_buffer-append((const char *) *__buf, __n) ! SW_OK) {set_err(ENOMEM);goto _failed;}*__buf write_buffer-str;}write_co co;write_co-yield(cancel_fn);write_co nullptr;} else {assert(0);return false;}
_failed:
#ifdef SW_USE_OPENSSL// maybe read_co and write_co are all waiting for the same event when we use SSLif (sw_likely(want_event SW_EVENT_NULL || !has_bound()))
#endif{Reactor *reactor SwooleTG.reactor;if (sw_likely(added_event SW_EVENT_READ)) {reactor-remove_read_event(socket);} else {reactor-remove_write_event(socket);}}
#ifdef SW_USE_OPENSSLwant_event SW_EVENT_NULL;
#endifswoole_trace_log(SW_TRACE_SOCKET,socket#%d blongs to cid#%ld trigger %s event,sock_fd,co-get_cid(),get_trigger_event_name(this, added_event));return !is_closed() !errCode;
}同理 send() 和 recv() 函数也和 connect() 函数是一样的实现方式。
// swoole-src/src/coroutine/socket.cc:847
ssize_t Socket::send(const void *__buf, size_t __n) {if (sw_unlikely(!is_available(SW_EVENT_WRITE))) {return -1;}ssize_t retval;TimerController timer(write_timer, write_timeout, this, timer_callback);do {// 发送数据retval socket-send(__buf, __n, 0);} while (retval 0 socket-catch_write_error(errno) SW_WAIT timer.start() wait_event(SW_EVENT_WRITE, __buf, __n));check_return_value(retval);return retval;
}// swoole-src/src/coroutine/socket.cc:874
ssize_t Socket::recv(void *__buf, size_t __n) {if (sw_unlikely(!is_available(SW_EVENT_READ))) {return -1;}ssize_t retval;TimerController timer(read_timer, read_timeout, this, timer_callback);do {// 接收数据retval socket-recv(__buf, __n, 0);} while (retval 0 socket-catch_read_error(errno) SW_WAIT timer.start() wait_event(SW_EVENT_READ));check_return_value(retval);return retval;
}也是调用 wait_event() 函数来实现当前的协程切换唯一的区别就是事件的类型不同一个是读事件一个是写事件。
Run 协程容器
在 Swoole 中要想使用协程那么必须要在协程的环境中使用协程的客户端或者支持 Hook 的原生 PHP 函数。才能享受到 Swoole 中协程带来的高性能不然和普通的 PHP 执行程序没有什么区别变成了同步阻塞。
在源码中协程容器主要是实现了事件循环的初始化、协程上下文的创建管理、事件循环的 IO 事件监听接下来我们会主要分析关于事件管理的部分内容。
// swoole-src/src/coroutine/base.cc:210
namespace coroutine {bool run(const CoroutineFunc fn, void *arg) {// 事件循环的初始化if (swoole_event_init(SW_EVENTLOOP_WAIT_EXIT) 0) {return false;}// 协程上下文的创建管理Coroutine::activate();long cid Coroutine::create(fn, arg);// 事件循环的 IO 事件监听swoole_event_wait();Coroutine::deactivate();return cid 0;}
}Event 事件初始化
Event 事件初始化主要是定义一些事件的回调函数便于在事件被触发时恢复对应的协程进行后续的逻辑处理例如读事件回调函数 readable_event_callback、写事件回调函数 writable_event_callback 等。
// swoole-src/src/wrapper/event.cc:37
int swoole_event_init(int flags) {if (!SwooleG.init) {std::unique_lockstd::mutex lock(init_lock);swoole_init();}// 创建一个 Reactor 实例对象Reactor *reactor new Reactor(SW_REACTOR_MAXEVENTS);if (!reactor-ready()) {return SW_ERR;}if (flags SW_EVENTLOOP_WAIT_EXIT) {reactor-wait_exit 1;}// Socket 事件初始化coroutine::Socket::init_reactor(reactor);coroutine::System::init_reactor(reactor);network::Client::init_reactor(reactor);SwooleTG.reactor reactor;return SW_OK;
}// swoole-src/include/swoole_coroutine_sokcet.h:157
static inline void init_reactor(Reactor *reactor) {// 定义对应事件的回调函数reactor-set_handler(SW_FD_CO_SOCKET | SW_EVENT_READ, readable_event_callback);reactor-set_handler(SW_FD_CO_SOCKET | SW_EVENT_WRITE, writable_event_callback);reactor-set_handler(SW_FD_CO_SOCKET | SW_EVENT_ERROR, error_event_callback);
}// swoole-src/src/coroutine/socket.c:48
int Socket::readable_event_callback(Reactor *reactor, Event *event) {Socket *socket (Socket *) event-socket-object;socket-set_err(0);
#ifdef SW_USE_OPENSSLif (sw_unlikely(socket-want_event ! SW_EVENT_NULL)) {if (socket-want_event SW_EVENT_READ) {// 恢复对应的协程socket-write_co-resume();}} else
#endif{if (socket-recv_barrier (*socket-recv_barrier)() !event-socket-event_hup) {return SW_OK;}// 恢复对应的协程socket-read_co-resume();}return SW_OK;
}Event 事件监听
Event 事件监听主要是针对被加入到事件循环中的 Socket 进行 IO 事件的监听如果有读或写 IO 事件的触发则回调到对应的处理函数上进行执行。
// swoole-src/src/warpper/event.cc:84
int swoole_event_wait() {Reactor *reactor SwooleTG.reactor;int retval 0;if (!reactor-wait_exit or !reactor-if_exit()) {// 事件循环等待调用retval reactor-wait(nullptr);}swoole_event_free();return retval;
}// swoole-src/src/reactor/epoll.cc:153
int ReactorEpoll::wait(struct timeval *timeo) {Event event;ReactorHandler handler;int i, n, ret;int reactor_id reactor_-id;int max_event_num reactor_-max_event_num;if (reactor_-timeout_msec 0) {if (timeo nullptr) {reactor_-timeout_msec -1;} else {reactor_-timeout_msec timeo-tv_sec * 1000 timeo-tv_usec / 1000;}}reactor_-before_wait();while (reactor_-running) {if (reactor_-onBegin ! nullptr) {reactor_-onBegin(reactor_);}// 监听 IO 事件n epoll_wait(epfd_, events_, max_event_num, reactor_-get_timeout_msec());if (n 0) {if (!reactor_-catch_error()) {swoole_sys_warning([Reactor#%d] epoll_wait failed, reactor_id);return SW_ERR;} else {goto _continue;}} else if (n 0) {reactor_-execute_end_callbacks(true);SW_REACTOR_CONTINUE;}for (i 0; i n; i) {event.reactor_id reactor_id;event.socket (Socket *) events_[i].data.ptr;event.type event.socket-fd_type;event.fd event.socket-fd;if (events_[i].events (EPOLLRDHUP | EPOLLERR | EPOLLHUP)) {event.socket-event_hup 1;}// read 读事件这里的 handler 对应 readable_event_callbackif ((events_[i].events EPOLLIN) !event.socket-removed) {handler reactor_-get_handler(SW_EVENT_READ, event.type);ret handler(reactor_, event);if (ret 0) {swoole_sys_warning(EPOLLIN handle failed. fd%d, event.fd);}}// write 写事件这里的 handler 对应 writable_event_callbackif ((events_[i].events EPOLLOUT) !event.socket-removed) {handler reactor_-get_handler(SW_EVENT_WRITE, event.type);ret handler(reactor_, event);if (ret 0) {swoole_sys_warning(EPOLLOUT handle failed. fd%d, event.fd);}}// error 错误处理这里的 handler 对应 error_event_callbackif ((events_[i].events (EPOLLRDHUP | EPOLLERR | EPOLLHUP)) !event.socket-removed) {// ignore ERR and HUP, because event is already processed at IN and OUT handler.if ((events_[i].events EPOLLIN) || (events_[i].events EPOLLOUT)) {continue;}handler reactor_-get_error_handler(event.type);ret handler(reactor_, event);if (ret 0) {swoole_sys_warning(EPOLLERR handle failed. fd%d, event.fd);}}if (!event.socket-removed (event.socket-events SW_EVENT_ONCE)) {reactor_-_del(event.socket);}}_continue:reactor_-execute_end_callbacks(false);SW_REACTOR_CONTINUE;}return 0;
}总结
协程又称轻量级线程协程是用户级线程不需要操作系统参与创建切换成本低。Swoole 中的协程是无法利用多核 CPU 的如果想利用多核 CPU 则需要依赖 Swoole 的多进程模型。Swoole 中协程的是利用的 Event 事件循环进行调度的将遇到 IO 操作的 Socket 统一加入到事件循环中。本次的源码分析旨在了解整个协程在 Swoole 中的运行逻辑打开我们的思路便于我们更好的体会到协程所带来的高性能价值。