网站 建设开发合同模板,知乎网站怎么做推广,自己房子怎么挂网站做民宿,施工企业领导带班记录高级IO—poll,epoll,reactor 文章目录 高级IO—poll,epoll,reactorpoll函数poll函数接口poll服务器 epollepoll的系统调用epoll_createepoll_ctlepoll_wait epoll的工作原理epoll的工作方式水平触发边缘触发 epoll服务器 reactor poll函数
poll函数是一个用于多路复用的系统调…高级IO—poll,epoll,reactor 文章目录 高级IO—poll,epoll,reactorpoll函数poll函数接口poll服务器 epollepoll的系统调用epoll_createepoll_ctlepoll_wait epoll的工作原理epoll的工作方式水平触发边缘触发 epoll服务器 reactor poll函数
poll函数是一个用于多路复用的系统调用类似于select函数用于监视一组文件描述符的状态。
poll函数接口
函数原型 #include poll.h
int poll(struct pollfd *fds, nfds_t nfds, int timeout);fds指向一个struct pollfd结构数组的指针每个结构体描述一个要监视的文件描述符及其关注的事件。 nfdsfds数组中的结构体数量。 timeout超时时间以毫秒为单位。设置为-1表示阻塞式设置为0表示非阻塞设置为大于0的数例如设置为5000表示阻塞5秒5秒后非阻塞返回一次。
struct pollfd结构体定义如下
struct pollfd {int fd; // 文件描述符short events; // 关注的事件short revents; // 实际发生的事件-short-整数
};event字段用于设置关注的事件可以使用如下宏通过event宏事件可以将事件添加到event中 revent字段用于返回实际发生的事件也可以使用上述宏进行判断。通过revent|宏事件可以得知revent中是否包含了该就绪事件 poll函数的返回值是就绪文件描述符的数量返回值大于0表示已经有文件描述符就绪返回值等于0表示没有文件描述符就绪返回值小于0表示出错可以使用perror输出错误信息。
总结一下
在poll函数的pollfd结构体中有event参数保存用户需要OS关心的事件有revent参数保存OS告诉用户就绪的事件即做到了输入输出分离不需要像select函数借助第三方数组对sock进行管理。poll函数等待文件描述符理论上没有上限。由于参数fds是一个动态数组并不是一个确定的结构。不同于位图动态数组可以动态扩容。且动态数组的大小取决于用户传入的nfds。
poll服务器
现对select服务器套用poll函数改造成poll服务器。只需要对实现业务hpp改造即可 pollserver.hpp #pragma once#include iostream
#include sys/select.h
#include string
#include functional
#includepoll.h
#include Sock.hppusing namespace std;namespace Poll_sv
{static const int defaultport 8080; // 默认端口号static const int defaultfd -1; // 默认套接字标志static const int fdnum2048;//设置文件描述符的数量using func_t functionstring(const string );class PollServer{public:PollServer(func_t f, int port defaultport) : _func(f), _port(port), _listensock(-1),_rfds(nullptr){}void initServer(){// 获取套接字_listensock Sock::Socket();cout Sock success endl;// 绑定网络信息Sock::Bind(_listensock, _port);cout Bind success endl;// 把套接字设置为监听状态Sock::Listen(_listensock);cout Listen success endl;_rfdsnew struct pollfd[fdnum];//指针指向一个成员是poll结构体的数组for(int i0;ifdnum;i){_rfds[i].fddefaultfd;_rfds[i].events0;_rfds[i].revents0;}_rfds[0].fd_listensock;_rfds[0].eventsPOLLIN;// cout initServer endl;}void Print(){cout now using socket: ;for (int i 0; i fdnum; i){if(_rfds[i].fd!defaultfd)cout_rfds[i].fd ;//打印正在使用的fd}cout endl;}void Accpter(int lsock){// logMessage(DEBUG, Accepter begin);string clientip;uint16_t clientport 0;int sock Sock::Accpet(lsock, clientip, clientport); // 若成功返回返回一个用于通信的套接字if (sock 0)return;logMessage(NORMAL, accept success [%s:%d], clientip.c_str(), clientport);int i0;for(;ifdnum;i){if(_rfds[i].fd!defaultfd)//这里是找到默认的位置给后续需要使用的文件描述符用因此是跳过已经被使用的位置continue;else break;}if(ifdnum){close(sock);logMessage(WARNING,fd full,please wait);}else{_rfds[i].fdsock;_rfds[i].eventsPOLLIN;_rfds[i].revents0;logMessage(NORMAL,sock has set in rfds);}Print();// logMessage(DEBUG, Accepter end);}void Recver(int pos){// logMessage(DEBUG, Recver begin);char buffer[1024];ssize_t s recv(_rfds[pos].fd, buffer, sizeof(buffer) - 1, 0);if (s 0){buffer[s] 0;cout client# buffer endl;}else if (s 0){close(_rfds[pos].fd); // 关闭该套接字关闭通信通道_rfds[pos].fd defaultfd; // 将数组中的该套接字清除logMessage(NORMAL, client quit);return;}else{close(_rfds[pos].fd);_rfds[pos].fd defaultfd; // 将数组中的该套接字清除logMessage(ERROR, recv error);return;}// 将客户端发来的数据原样写回去string resp _func(buffer);write(_rfds[pos].fd, resp.c_str(), resp.size()); // 写回去// logMessage(DEBUG, Recever end);}void Handlerop(){for (int i 0; i fdnum; i){if (_rfds[i].fd defaultfd)//fd没有被设置则跳过continue;if(!(_rfds[i].eventsPOLLIN)) continue;//结构体不是被指定标志位POLLIN设置过则跳过if (_rfds[i].fd_listensock_rfds[i].reventsPOLLIN)// 此时i对应的数组位置是拿到连接的文件描述符意味着在底层连接已经拿到等待上层提取{Accpter(_listensock);}else if (_rfds[i].reventsPOLLIN) // 此时存在数组内的对应套接字都是底层读资源就绪{Recver(i);}else{}}}void Start(){int timeout-1;for(;;){// coutpoll readyendl;int npoll(_rfds,fdnum,timeout);// coutpoll finishedendl;switch(n){case 0:logMessage(NORMAL,timeout...);break;case -1:logMessage(WARNING,poll error);break;default:logMessage(NORMAL,poll success);Handlerop();break;}}}~PollServer(){if (_listensock 0) // 为什么是小于0close(_listensock);if(_rfds!nullptr)delete[]_rfds;}private:int _port;int _listensock;struct pollfd* _rfds;//指向poll结构体的指针func_t _func;};
}总结一下
poll函数通过输入输出分离避免了借用第三方数组来记录sock和事件。poll解决了select具有管理文件描述符数量上限的问题。poll依旧存在遍历问题。由于检查就绪事件依旧是需要遍历整个数组即时间复杂度为O(N)。
epoll模型可以解决遍历问题使得检查就绪事件的时间复杂度优化到O(1)。
epoll
epoll的系统调用
epoll_create
epoll_create用于创建一个epoll模型
函数原型
int epoll_create(int size);size 指定了 epoll 实例能够同时监视的文件描述符的数量上限。返回值为一个非负整数的文件描述符epollfd表示创建的epoll模型对象。创建失败返回值为-1。可以传递epollfd给epoll_ctl函数像epoll模型中添加、修改、删除需要监视的文件描述符以及事件。注意一下使用epoll模型后需要关闭epollfd。
epoll_ctl
epoll_ctl 函数用于向 epoll 实例中添加、修改或删除要监视的文件描述符并设置关注的事件。
函数原型
#include sys/epoll.hint epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);epfdepoll 模型的文件描述符由 epoll_create 函数返回。op操作类型可以是这些值 EPOLL_CTL_ADD将文件描述符 fd 添加到 epoll 模型中。 EPOLL_CTL_MOD修改已经添加到 epoll模型中的文件描述符 fd 的关注事件。 EPOLL_CTL_DEL从 epoll模型中删除文件描述符 fd。fd要添加、修改或删除的文件描述符。event指向 struct epoll_event 结构体的指针用于设置关注的事件。
struct epoll_event结构体定义如下
struct epoll_event {_uint32_t events; // 关注的事件epoll_data_t data; // 用户数据
};events字段用于设置关注的事件。可以使用以下宏设置EPOLLIN可读事件。EPOLLOUT可写事件。EPOLLPRI紧急事件。EPOLLERR错误事件。EPOLLHUP挂起事件。EPOLLET边缘触发模式。EPOLLONESHOT一次性事件。
epoll_data_t结构定义如下
typedef union epoll_data
{void *ptr;int fd;uint32_t u32;uint64_t u64;
} epoll_data_t;fd是用户传递需要监管的文件描述符。 调用成功返回0失败返回-1。
epoll_wait
epoll_wait函数用于收集epoll模型中已经就绪的事件并将就绪事件的个数返回
函数原型
#include sys/epoll.hint epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);epfdepoll 实例的文件描述符由 epoll_create 函数返回。events指向 struct epoll_event 结构体数组的指针用于存储就绪的事件。maxeventsevents 数组的大小表示最多可以存储多少个事件。timeout等待事件的超时时间单位为毫秒。设置为-1表示阻塞式设置为0表示非阻塞设置为大于0的数例如设置为5000表示阻塞5秒5秒后非阻塞返回一次。函数的返回值表示就绪的事件数量返回值大于0表示已经有事件就绪返回值等于0表示超时返回无事发生。返回值小于0表示调用出错。
epoll的工作原理
数据本身只能按照输入设备—内存—CPU—内存—输出设备的方向流动。 网络数据到达主机时是先到达网卡即网课外设。CPU有很多针脚外设虽然不会与CPU有直接的数据流通但外设可以将信号直接发送到CPU的阵脚上硬件中断。外设通过中断设备将电子信号发送到CPU上。CPU会将该信号转发到中断向量表根据信号的值找到对应的表中位置下标该表是一张函数指针数组根据指针能够调用驱动方法驱动方法将数据拷贝到内存上即数据从硬件传输到了OS中。 在OS中会以红黑树的方式管理sock和events。每个节点上都有sock和event当然还有left指针和right指针。红黑树的优点是查找效率高。当用户告诉内核那些事件需要被关心时OS会将需要关心的事件、sock放到该红黑树当中。当事件就绪OS会将红黑树上的节点添加到就绪队列中该就绪队列中的成员表示内核需要告诉用户那些事件已经就绪等待上层将该事件资源取走。实际上一个节点即能存在于红黑树中也能存在于就绪队列中。 总结一下 网络数据到来后外设通过信号中断将数据拷贝到内存上。细致的说是底层收到数据后贯穿网络协议栈向上交付数据可以把红黑树的节点看作成文件描述符sock根据sock找到对应的文件结构体struct file该结构体内有指针指向接收缓存区然后将数据填充到该文件接收缓冲区中。随后调用struct file中的回调方法void* private data指针指向一个回调函数-回调机制该回调方法会将红黑树中的节点添加到就绪队列当中表示该事件已经就绪通知用户来取走数据。 红黑树、就绪队列、epoll管理的文件struct file以及部分网络协议栈一整套可以认作成一个epoll模型。进程可以通过文件描述符表找到epoll_create的返回值epollfdepollfd会指向自己的struct file在struct file中有关联指针能够找到对应的epoll模型。
重新看待epoll的相关接口。操作系统需要提供接口该上层使用。 #include sys/epoll.h
int epoll_create(int size);epoll_create创建epoll模型并返回一个epollfd供上层使用。
#include sys/epoll.h
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);epoll_ctl根据提供的epollfd找到对应的epoll模型根据提供的fd和event增删改红黑树中的节点。
int epoll_wait(int epfd, struct epoll_event *events,int maxevents, int timeout);epoll_wait根据epollfd找到对应的epoll模型根据提供的events、maxevents、timeout管理就绪队列中的节点。而epoll_wait检测就绪事件只需要检测已经就绪的事件不需要遍历检测整个需要管理的事件表只需要将就绪的事件资源从内核层拷贝到用户层。因此遍历的事件复杂度为O(1)。
其次epoll_wait会按照传入的顺序依次放回到就绪事件数组中。根据队列先进先出的特性若就绪队列中有很多数据节点一次性拿不完可以下一次调用epoll_wait再拿。
而伴随epoll模型的创建的辅助数组就是该管理需要关心事件的红黑树。红黑树节点是以kv形式存在由于sock是不会重复的因此sock作为key。
epoll相比于select、poll将大部分管理任务分配给操作系统负责也因此epoll的接口相比较简单。
epoll的工作方式
前面都有提到select、poll、epoll在事件就绪的时候通知上层将事件处理。事件就绪可以认为底层的IO条件满足可以进行某种IO行为epoll就会通知上层进行这种IO行为将底层的数据提取走。
epoll有2种通知策略即水平触发(LT)和边缘触发(ET)
水平触发 水平触发为Level Triggered简称LT。epoll默认状态下是LT工作策略。 水平触发关心的是缓冲区的状态当缓冲区可读的时候就会一直发出通知也就是当缓冲区中只要有数据就会发出通知知道上层将缓冲区的所有数据读完。可以认为你在做老师布置的作业写了但没写完的情况下老师就会一直通知你写作业直到作业完全写完。 LT的优势在于可以在读取数据的时候只读取一部分在第二次调用epoll_wait时立刻返回并通知上层将底层数据读走。 支持阻塞读写和阻塞读写。‘
边缘触发
边缘触发为Edge Triggered。简称ET。边缘触发关心的是缓冲区状态的变化当缓冲区状态发生变化的时候才会发出通知比如缓冲区中来了新的数据。底层有数据ET模式下epoll只会通知上层一次后续缓冲区来了新的数据epoll才会再次通知。ET优势在于ET是一次性通知的方式倒逼上层尽量做到一次性将数据读完。其次是尽可能一次性读取多的数据从而使得接收缓冲区可容纳下一次数据的空间尽可能的大那么接收方的接收能力自然就强能够告诉发送数据方接收方的滑动窗口较大让对方更新出更大的滑动窗口提高数据发送的效率。ET模式下文件描述符要求是非阻塞的。由于底层的事件到达增多OS才会通知上层将数据取走因此用户提取数据时尽量调用一次read,recv就把数据取完而为了避免一次性调用函数没有读完就需要循环调用读取函数当调用读取函数直到读取不到数据才算作读完数据。若fd是阻塞的那么读取函数进行最后一次读取时读取不到数据就会阻塞因此fd需要是非阻塞的读取不到数据直接返回。
LT模式下fd可以是阻塞的也可以是非阻塞的。LT可以模拟ET工作方式。
epoll服务器 epollserver.cc #includeiostream
#includefunctional
#includevector
#includememory
#includeerr.hpp
#includeepollserver.hpp
using namespace std;
using namespace EPoll_sv;// static void Usage(string proc)
// {
// cerrUsage:\n\tproc port \n\n;
// }string resp(const string s)
{return s;
}int main(int argc,char* argv[])
{// if(argc!2)// {// Usage(argv[0]);// exit(USAGE_ERR);// }unique_ptrEpollServer epolsv(new EpollServer(resp));epolsv-initServer();epolsv-start();return 0;
}log.hpp #pragma once#include iostream
#include string
#includectime
#include sys/types.h#include unistd.h#include stdio.h
#include stdarg.h
using namespace std;
#define DEBUG 0
#define NORMAL 1
#define WARNING 2
#define ERROR 3
#define FATAL 4#define NUM 1024
#define LOG_STR ./logstr.txt
#define LOG_ERR ./log.errenum
{USAGE_ERR 1,SOCKET_ERR,BIND_ERR,LISTEN_ERR,EPOLL_CREATE_ERR
};
const char* to_str(int level)
{switch(level){case DEBUG: return DEBUG;case NORMAL: return NORMAL;case WARNING: return WARNING;case ERROR: return ERROR;case FATAL: return FATAL;default: return nullptr;}
}void logMessage(int level, const char* format,...)
{char logprestr[NUM];
snprintf(logprestr,sizeof(logprestr),[%s][%ld][%d],to_str(level),(long int)time(nullptr),getpid());char logeldstr[NUM];
va_list arg;
va_start(arg,format);
vsnprintf(logeldstr,sizeof(logeldstr),format,arg);//arg是logmessage函数列表中的...coutlogprestrlogeldstrendl;}Sock.hpp #pragma once#includeiostream
#includestring
#includecstring
#includesys/time.h
#include sys/types.h
#include unistd.h
#include sys/socket.h
#include netinet/in.h
#include arpa/inet.h
#include log.hpp
#include err.hppclass Sock
{const static int backlog32;public:static int Socket(){int socksocket(AF_INET,SOCK_STREAM,0);//创建套接字if(sock0)//创建失败{logMessage(FATAL,create sock error);exit(SOCKET_ERR);}//创建成功logMessage(NORMAL,create sock success);int opt1;setsockopt(sock,SOL_SOCKET,SO_REUSEADDR|SO_REUSEPORT,opt,sizeof(opt));//允许套接字关闭后立刻重启return sock;} static void Bind(int sock,int port){//绑定自己的网络信息struct sockaddr_in local;memset(local,0,sizeof(local));//将结构体清空local.sin_familyAF_INET;//添加协议local.sin_porthtons(port);//添加端口号local.sin_addr.s_addrhtons(INADDR_ANY);//不绑定指定IP可以接收任意IP主机发送来的数据//将本地设置的信息绑定到网络协议栈if (bind(sock,(struct sockaddr*)local,sizeof(local))0){logMessage(FATAL,bind socket error);exit(BIND_ERR);}logMessage(NORMAL,bind socket success);}static void Listen(int sock)//将套接字设置为监听{if(listen(sock,0)0){logMessage(FATAL,listen socket error);exit(LISTEN_ERR);}logMessage(NORMAL,listen socket success);}static int Accpet(int listensock,string * clientip,uint16_t* clientport){struct sockaddr_in cli;socklen_t len sizeof(cli);int sockaccept(listensock,(struct sockaddr*)cli,len);if(sock0){logMessage(FATAL,accept error);//这里accept失败为什么不退出}else{logMessage(NORMAL,accept a new link,get new sock : %d,sock);*clientipinet_ntoa(cli.sin_addr);*clientportntohs(cli.sin_port);}return sock;}
};默认的LT模式下的epollserver epollserver.hpp #pragma once#include iostream
#include sys/select.h
#include string
#include functional
#includesys/epoll.h
#includeerr.hpp
#include Sock.hppusing namespace std;namespace EPoll_sv
{static const int defaultport 8080;static const int size 128;static const int defaultvalue -1;static const int defaultnum 64;using func_t functionstring(const string);class EpollServer{public:EpollServer(func_t fun,const int port defaultport) :_num(defaultnum), _port(port), _listensock(defaultvalue), _epfd(defaultvalue),_func(fun){}void handlerEvent(int evs){for(int i0;ievs;i)//直接遍历已经就绪的事件{uint32_t event_reves[i].events;int sock_reves[i].data.fd;if(sock_listensock(eventEPOLLIN))//当前是将连接拿上应用层的文件描述符{string clientip;uint16_t clientport;int fdSock::Accpet(_listensock,clientip,clientport);if(fd0){logMessage(NORMAL,accpet sock error);continue;}struct epoll_event ev;ev.data.fdfd;ev.eventsEPOLLIN;epoll_ctl(_epfd,EPOLL_CTL_ADD,fd,ev);}else if(eventEPOLLIN)//当前是通信的事件{char buffer[1024];int nrecv(sock,buffer,sizeof(buffer)-1,0);if(n0){buffer[n]0;coutclient# bufferendl;string resp_func(buffer);send(sock,resp.c_str(),resp.size(),0);//将数据发送回去给客户端}else if(n0){epoll_ctl(_epfd,EPOLL_CTL_DEL,sock,nullptr);//将sock从epoll中的结构中移除close(sock);//关闭socklogMessage(NORMAL,client quit);}else{epoll_ctl(_epfd,EPOLL_CTL_DEL,sock,nullptr);//将sock从epoll中的结构中移除close(sock);//关闭socklogMessage(ERROR,communicate error);}}else{}}logMessage(DEBUG,handlerEvent out);}void initServer(){// 获取套接字_listensock Sock::Socket();cout Sock success endl;// 绑定网络信息Sock::Bind(_listensock, _port);cout Bind success endl;// 把套接字设置为监听状态Sock::Listen(_listensock);cout Listen success endl;_epfdepoll_create(size);//调用成功返回一个epoll文件描述符size表示是需要监听的文件描述符的数量if(_epfd0){logMessage(FATAL,epoll_create error);exit(EPOLL_CREATE_ERR);}//将listensock添加到epoll模型中struct epoll_event epev;epev.eventsEPOLLIN;epev.data.fd_listensock;epoll_ctl(_epfd,EPOLL_CTL_ADD,_listensock,epev);//申请就绪时间的空间_revesnew struct epoll_event[_num];//申请一块空间内含_num个事件数logMessage(NORMAL, init server success);}void start(){//等待就绪事件int timeout-1;for(;;){logMessage(DEBUG,epoll_wait ready);int reepoll_wait(_epfd,_reves,_num,timeout);logMessage(DEBUG,epoll_wait end);switch (re){case 0://0个事件就绪即超时重传logMessage(NORMAL,timeout...);break;case -1://epoll_wait函数调用失败logMessage(ERROR,epoll_wait error,code: %d,errstring: %s,errno,strerror(errno)); default://到这里时返回值都大于0即re为已经就绪的事件数logMessage(NORMAL,wait incident success);// handlerEvent(re);break;}}}~EpollServer(){if(_listensock!defaultvalue){close(_listensock);}if(_epfd!defaultvalue){close(_epfd);}if(_reves!nullptr){delete[]_reves;}}private:int _port;int _listensock;int _epfd;struct epoll_event* _reves;int _num;//事件数func_t _func;//外部传递进来的函数};
}客户端连接上但不处理就会一直通知。
ET模式下的epollserver服务器 epollserver.hpp #pragma once#include iostream
#include sys/select.h
#include string
#include functional
#includesys/epoll.h
#includeunistd.h
#includefcntl.h
#includeerr.hpp
#include Sock.hppusing namespace std;namespace EPoll_sv
{static const int defaultport 8080;static const int size 128;static const int defaultvalue -1;static const int defaultnum 64;using func_t functionstring(const string);class EpollServer{public:EpollServer(func_t fun,const int port defaultport) :_num(defaultnum), _port(port), _listensock(defaultvalue), _epfd(defaultvalue),_func(fun){}void handlerEvent(int evs){for(int i0;ievs;i)//直接遍历已经就绪的事件{uint32_t event_reves[i].events;int sock_reves[i].data.fd;if(eventEPOLLET){Sock::setNonBlock(sock);}if(sock_listensock(eventEPOLLIN))//当前是将连接拿上应用层的文件描述符{string clientip;uint16_t clientport;int fdSock::Accpet(_listensock,clientip,clientport);if(fd0){logMessage(NORMAL,accpet sock error);continue;}struct epoll_event ev;ev.data.fdfd;ev.eventsEPOLLIN|EPOLLET;epoll_ctl(_epfd,EPOLL_CTL_ADD,fd,ev);}else if(eventEPOLLIN)//当前是通信的事件{char buffer[1024];int nrecv(sock,buffer,sizeof(buffer)-1,0);if(n0){buffer[n]0;coutclient# bufferendl;string resp_func(buffer);send(sock,resp.c_str(),resp.size(),0);//将数据发送回去给客户端}else if(n0){epoll_ctl(_epfd,EPOLL_CTL_DEL,sock,nullptr);//将sock从epoll中的结构中移除close(sock);//关闭socklogMessage(NORMAL,client quit);}else{epoll_ctl(_epfd,EPOLL_CTL_DEL,sock,nullptr);//将sock从epoll中的结构中移除close(sock);//关闭socklogMessage(ERROR,communicate error);}}else{}}// logMessage(DEBUG,handlerEvent out);}void initServer(){// 获取套接字_listensock Sock::Socket();cout Sock success endl;// 绑定网络信息Sock::Bind(_listensock, _port);cout Bind success endl;// 把套接字设置为监听状态Sock::Listen(_listensock);cout Listen success endl;_epfdepoll_create(size);//调用成功返回一个epoll文件描述符size表示是需要监听的文件描述符的数量if(_epfd0){logMessage(FATAL,epoll_create error);exit(EPOLL_CREATE_ERR);}//将listensock添加到epoll模型中struct epoll_event epev;epev.eventsEPOLLIN|EPOLLET;epev.data.fd_listensock;epoll_ctl(_epfd,EPOLL_CTL_ADD,_listensock,epev);//申请就绪时间的空间_revesnew struct epoll_event[_num];//申请一块空间内含_num个事件数logMessage(NORMAL, init server success);}void start(){//等待就绪事件int timeout-1;for(;;){sleep(1);logMessage(DEBUG,epoll_wait ready);int reepoll_wait(_epfd,_reves,_num,timeout);logMessage(DEBUG,epoll_wait end);switch (re){case 0://0个事件就绪即超时重传logMessage(NORMAL,timeout...);break;case -1://epoll_wait函数调用失败logMessage(ERROR,epoll_wait error,code: %d,errstring: %s,errno,strerror(errno)); default://到这里时返回值都大于0即re为已经就绪的事件数logMessage(NORMAL,wait incident success);// handlerEvent(re);break;}}}~EpollServer(){if(_listensock!defaultvalue){close(_listensock);}if(_epfd!defaultvalue){close(_epfd);}if(_reves!nullptr){delete[]_reves;}}private:int _port;int _listensock;int _epfd;struct epoll_event* _reves;int _num;//事件数func_t _func;//外部传递进来的函数};
}static void setNonBlock(int fd)//把文件描述符设置为非阻塞
{int nfcntl(fd,F_GETFL);//获取文件描述符的状态正常返回非-1的标志位出错返回-1if(n0){cerrfcntl :strerror(errno)endl;return ;}fcntl(fd,F_SETFL,n|O_NONBLOCK);//对文件描述符的状态进行设置设置为非阻塞状态
}需要将文件描述符都设置成EPOLLET模式在处理事件函数内若sock是EPOLLET模式就调用setNonBlock将该sock设置成非阻塞。 Sock.hpp #pragma once#includeiostream
#includestring
#includecstring
#includesys/time.h
#include sys/types.h
#include unistd.h
#include sys/socket.h
#include netinet/in.h
#include arpa/inet.h
#include log.hpp
#include err.hppclass Sock
{const static int backlog32;public:static void setNonBlock(int fd)//把文件描述符设置为非阻塞
{int nfcntl(fd,F_GETFL);//获取文件描述符的状态正常返回非-1的标志位出错返回-1if(n0){cerrfcntl :strerror(errno)endl;return ;}fcntl(fd,F_SETFL,n|O_NONBLOCK);//对文件描述符的状态进行设置设置为非阻塞状态
}static int Socket(){int socksocket(AF_INET,SOCK_STREAM,0);//创建套接字if(sock0)//创建失败{logMessage(FATAL,create sock error);exit(SOCKET_ERR);}//创建成功logMessage(NORMAL,create sock success);int opt1;setsockopt(sock,SOL_SOCKET,SO_REUSEADDR|SO_REUSEPORT,opt,sizeof(opt));//允许套接字关闭后立刻重启return sock;} static void Bind(int sock,int port){//绑定自己的网络信息struct sockaddr_in local;memset(local,0,sizeof(local));//将结构体清空local.sin_familyAF_INET;//添加协议local.sin_porthtons(port);//添加端口号local.sin_addr.s_addrhtons(INADDR_ANY);//不绑定指定IP可以接收任意IP主机发送来的数据//将本地设置的信息绑定到网络协议栈if (bind(sock,(struct sockaddr*)local,sizeof(local))0){logMessage(FATAL,bind socket error);exit(BIND_ERR);}logMessage(NORMAL,bind socket success);}static void Listen(int sock)//将套接字设置为监听{if(listen(sock,0)0){logMessage(FATAL,listen socket error);exit(LISTEN_ERR);}logMessage(NORMAL,listen socket success);}static int Accpet(int listensock,string * clientip,uint16_t* clientport){struct sockaddr_in cli;socklen_t len sizeof(cli);int sockaccept(listensock,(struct sockaddr*)cli,len);if(sock0){logMessage(FATAL,accept error);//这里accept失败为什么不退出}else{logMessage(NORMAL,accept a new link,get new sock : %d,sock);*clientipinet_ntoa(cli.sin_addr);*clientportntohs(cli.sin_port);}return sock;}
};Sock类内新增了setNonBlock函数用于将传导过来的sock设置成非阻塞。 可以看到底层事件就绪时ET模式下的epollserver只通知了上层一次。
reactor
通过Reactor对底层事件的关心底层有就绪事件就通知上层的Connection类对象调用相关函数处理就绪事件。 Connection类 using func_t functionvoid(Connection *);class Connection{public:Connection(int sock, Tcpserver *tcps) : _consock(sock), _tcps(tcps){}~Connection() {}void Register(func_t r, func_t w, func_t e) // 注册方法表将方法传递进来{_reader r;_writer w;_excepter e;}void Close(){close(_consock);}public:string _inbuffer; // 输入缓冲区string _outbuffer; // 输出缓冲区func_t _reader; // 读操作func_t _writer; // 写操作func_t _excepter; // 处理异常操作int _consock;Tcpserver *_tcps; // 指向Tcpserver对象的指针};每一个文件描述符需要配备独自的堆上的接收缓冲区和输出缓冲区。先描述再组织因此创建了结构体Connection结构体内含文件描述符及其配备的输出缓冲区接收缓冲区该缓冲区类型为string当前该缓冲区只能处理字符串。三个回调函数。三个回调方法通过外部传参。回调方法分别是读事件方法、写事件方法、异常事件方法。由于Connection对象内的回调函数是Tcpserver对象赋予的因此需要先了解一下三个回调函数。 Accepter负责获取连接并将新连接添加到Connection对象内。 void Accepter(Connection *con) // 针对listenfd将获取到的连接从底层拿到应用层{for (;;){logMessage(DEBUG, enter Accepter);string clientip;uint16_t clientport;int err 0;int sock _sock.Accpet(clientip, clientport, err); // 获取成功返回新的文件描述符用于通信客户端的ip和port通过参数返回if (sock 0){// 连接拿上来了将fd添加到con对象中AddConnection(sock, EPOLLIN | EPOLLET,bind(Tcpserver::Recver, this, placeholders::_1),bind(Tcpserver::Sender, this, placeholders::_1),bind(Tcpserver::Excepter, this, placeholders::_1));logMessage(DEBUG, get new link,[%s:%d], clientip.c_str(), clientport);}else{if (err EAGAIN || err EWOULDBLOCK)break; // 读完了else if (err EINTR)continue; // 因为中断继续读elsebreak; // 错误}}}Recver负责读取底层的数据 void Recver(Connection *con) // 读事件{char buffer[1024];while (true){ssize_t i recv(con-_consock, buffer, sizeof(buffer) - 1, 0);if (i 0){buffer[i] 0;con-_inbuffer buffer;//每次读到的数据放到配套的缓冲区内logMessage(DEBUG, recv str: %s, con-_inbuffer.c_str());_func(con);}else if (i 0) // 断开连接异常处理{if (con-_excepter){con-_excepter(con);return;}}else{if (errno EAGAIN || errno EWOULDBLOCK){break; // 读完了}else if (errno EINTR) // 因信号中断继续读{continue;}else{if (con-_excepter){con-_excepter(con);return;}}}}}将每次读取到的数据填充到配备的接收缓冲区读到完整报文后调用_func函数处理数据。 Sender将sock配备的输出缓冲区内的数据发回给客户端。 void Sender(Connection *con) // 写事件{while (true){ssize_t i send(con-_consock, con-_outbuffer.c_str(), sizeof(con-_outbuffer), 0);if (i 0){if (con-_outbuffer.empty()) // 内容当前send函数一次性发完了{logMessage(DEBUG, sender finish);con-_tcps-EnableReadWrite(con, true, false); // 发完了把写通道关闭// sleep(2);break;}else{logMessage(DEBUG, sender not finish);con-_outbuffer.erase(0, i); // 如果一次性没发完那么就将发完的部分删减掉剩余的下次再发}}else{if (errno EAGAIN || errno EWOULDBLOCK) // 上次发完了这次再发就会err为这两个字段{break;}else if (errno EINTR){continue; // 因信号中断了继续发送}else{logMessage(DEBUG, excepter);if (con-_excepter) // 异常了执行异常事件{con-_excepter(con);return;}}}} Tcpserver.hpp #pragma once#include iostream
#include sys/select.h
#include string
#include functional
#include sys/epoll.h
#include unordered_map
#include assert.h
#include Epoller.hpp
#include err.hpp
#include Sock.hpp
#include until.hppusing namespace std;namespace TCP_sv
{static const int defaultport 8080;static const int Gnum 64;class Tcpserver;class Connection;using func_t functionvoid(Connection *);class Connection{public:Connection(int sock, Tcpserver *tcps) : _consock(sock), _tcps(tcps){}~Connection() {}void Register(func_t r, func_t w, func_t e) // 注册方法表将方法传递进来{_reader r;_writer w;_excepter e;}void Close(){close(_consock);}public:string _inbuffer; // 输入缓冲区string _outbuffer; // 输出缓冲区func_t _reader; // 读操作func_t _writer; // 写操作func_t _excepter; // 处理异常操作int _consock;Tcpserver *_tcps; // 指向Tcpserver对象的指针};class Tcpserver{private:void Recver(Connection *con) // 读事件{char buffer[1024];while (true){ssize_t i recv(con-_consock, buffer, sizeof(buffer) - 1, 0);if (i 0){buffer[i] 0;con-_inbuffer buffer;//每次读到的数据放到配套的缓冲区内logMessage(DEBUG, recv str: %s, con-_inbuffer.c_str());_func(con);}else if (i 0) // 断开连接异常处理{if (con-_excepter){con-_excepter(con);return;}}else{if (errno EAGAIN || errno EWOULDBLOCK){break; // 读完了}else if (errno EINTR) // 因信号中断继续读{continue;}else{if (con-_excepter){con-_excepter(con);return;}}}}}void Sender(Connection *con) // 写事件{while (true){ssize_t i send(con-_consock, con-_outbuffer.c_str(), sizeof(con-_outbuffer), 0);if (i 0){if (con-_outbuffer.empty()) // 内容当前send函数一次性发完了{logMessage(DEBUG, sender finish);con-_tcps-EnableReadWrite(con, true, false); // 发完了把写通道关闭// sleep(2);break;}else{logMessage(DEBUG, sender not finish);con-_outbuffer.erase(0, i); // 如果一次性没发完那么就将发完的部分删减掉剩余的下次再发}}else{if (errno EAGAIN || errno EWOULDBLOCK) // 上次发完了这次再发就会err为这两个字段{break;}else if (errno EINTR){continue; // 因信号中断了继续发送}else{logMessage(DEBUG, excepter);if (con-_excepter) // 异常了执行异常事件{con-_excepter(con);return;}}}}}void Excepter(Connection *con) // 异常事件{logMessage(DEBUG, enter excepter);_epoller.Control(con-_consock, 0, EPOLL_CTL_DEL);con-Close();_Connections.erase(con-_consock);logMessage(DEBUG, out excepter);delete con;}void Accepter(Connection *con) // 针对listenfd将获取到的连接从底层拿到应用层{for (;;){logMessage(DEBUG, enter Accepter);string clientip;uint16_t clientport;int err 0;int sock _sock.Accpet(clientip, clientport, err); // 获取成功返回新的文件描述符用于通信客户端的ip和port通过参数返回if (sock 0){// 连接拿上来了将fd添加到con对象中AddConnection(sock, EPOLLIN | EPOLLET,bind(Tcpserver::Recver, this, placeholders::_1),bind(Tcpserver::Sender, this, placeholders::_1),bind(Tcpserver::Excepter, this, placeholders::_1));logMessage(DEBUG, get new link,[%s:%d], clientip.c_str(), clientport);}else{if (err EAGAIN || err EWOULDBLOCK)break; // 读完了else if (err EINTR)continue; // 因为中断继续读elsebreak; // 错误}}}bool Isexist(int sock){auto iter _Connections.find(sock);return iter ! _Connections.end(); // 判断sock是否存在connection集合中}void AddConnection(int sock, uint32_t event, func_t reader, func_t writer, func_t excepter){if (event EPOLLET) // 如果是ET模式就将文件描述符设置为非阻塞Until::setNonBlock(sock);Connection *con new Connection(sock, this);con-Register(reader, writer, excepter); // 把外面的函数传进去初始化内部函数bool n _epoller.Add_Event(sock, event); // 告诉内核需要监管那些事件--将fd和事件添加到epoll模型中logMessage(DEBUG, Add event num: %d, n);assert(n);(void)n;_Connections.insert(pairint, Connection *(sock, con)); // 将fd和con对象添加到map中进行管理}void Loop(int timeout){// logMessage(DEBUG,enter Loop);int n _epoller.Wait(_revs, _num, timeout); // 获取已经就绪的事件for (int i 0; i n; i) //遍历就绪事件 // epoll_wait出错n是-1此时i不n,就进不去for循环{ // 拿到就绪事件的fd和event// sleep(2);// logMessage(DEBUG, enter Loop for);//提取sock和eventint sock _revs[i].data.fd;uint32_t event _revs[i].events;//处理异常事件--如果是异常事件那么会进入读事件和写事件但是读事件是不就绪的即读出错就会走到处理异常的代码区。
//同样的写事件也会发送写出错走到处理异常的代码区if (event EPOLLERR)// logMessage(DEBUG, event EPOLLERR);event | (EPOLLIN | EPOLLOUT);if (event EPOLLHUP)// logMessage(DEBUG, event EPOLLHUP);event | (EPOLLIN | EPOLLOUT); // 如果事件异常了就将该事件设置为读写事件// listenfd事件就绪if ((event EPOLLIN) (Isexist(sock)) (_Connections[sock]-_reader)){logMessage(DEBUG, con-_reader);_Connections[sock]-_reader(_Connections[sock]);}if ((event EPOLLOUT) (Isexist(sock)) (_Connections[sock]-_writer)){logMessage(DEBUG, con-_writer);_Connections[sock]-_writer(_Connections[sock]);}}// logMessage(DEBUG, quit Loop);}public:Tcpserver(func_t fun, int port defaultport) : _port(port), _func(fun){}void inittcpserver(){logMessage(DEBUG, enter inittcpserver\n);// 1.创建文件描述符_sock.Socket(); // 创建文件描述符--用于建立连接_sock.Bind(_port); // 绑定端口号和ip_sock.Listen(); // 将文件描述符设置为监视状态// 2.创建epoll模型_epoller.Create();//3.将listenfd添加到con对象即添加到epoll中并且注册配备的缓冲区和回调函数AddConnection(_sock.Fd(), EPOLLIN | EPOLLET,\bind(Tcpserver::Accepter, this, placeholders::_1), nullptr, nullptr); // 对于listenfd来说只关心读取事件logMessage(DEBUG, quit inittcpserver\n);_revs new struct epoll_event[Gnum]; // 创建一个事件集合供后续存放已经就绪的事件使用_num Gnum;}void EnableReadWrite(Connection *con, bool readable, bool writable){//判断uint32_t event (readable ? EPOLLIN : 0) | (writable ? EPOLLOUT : 0) | EPOLLET;_epoller.Control(con-_consock, event, EPOLL_CTL_MOD);}void Distribute()//事件派发s{logMessage(DEBUG, enter Distribute);while (true){Loop(-1);}logMessage(DEBUG, quit Distribute);}~Tcpserver(){_epoller.Close();if (_revs ! nullptr)delete[] _revs;}private:uint16_t _port;Sock _sock;Epoller _epoller;unordered_mapint, Connection * _Connections; // 建立sock和connection对象的映射表struct epoll_event *_revs; // 用来存储返回的事件func_t _func;int _num; // 可监管的事件总数};
}该代码中的bind用法是bind类内成员函数。用法是第一个参数需要传递类内成员函数对象的指针第二个参数需要传递类对象的指针后面才是传递类成员函数需要的参数。对一个成员函数对象使用bind后形成一个新的函数对象。例如调用AddConnection函数时第三四五参数是仿函数对象通过bind将传入的类内成员函数对象转换为仿函数对象。以传递Recver函数为例在bind表达式内第一个参数传递Recver函数对象指针第二个参数传递Tcpserver类指针第三个参数传递的是需要传递给Recver函数对象的参数即con指针。通过bind将Tcpserver内的Recver成员函数转换为Connection类内的_reader成员函数。 建立sock和Connection对象的映射关系以sock作为key值Connection对象的指针作为value值建立unordered_map数据结构进行组织管理Connection对象内是针对key值的sock配备的缓冲区处理函数。 在epoll中读取到数据处理完后不能立刻发送回给客户端。因为发送缓冲区是否具有空间是未知的。服务器启动后发送条件是就绪的是可以直接发送但会存在一次性发送完的情况可以下一次调用sender的时候再发送这就要求每一个sock需要配备自己的发送缓冲区。并且将发送事件注册到epoll中让epoll管理。由于服务器的需求以接收事件居多发送事件相比需求不大因此对于epoll来说接收事件是常规设置发送事件是按需设置。 main.cc #include iostream
#include functional
#include vector
#include memory
#include err.hpp
#include tcpserver.hpp
#include protocol.hpp
using namespace std;
using namespace TCP_sv;static void Usage(string proc)
{cerr Usage:\n\t proc port \n\n;
}string resp(const string s)
{return s;
}bool cal(const Request req, Response resp)
{// req已经有结构化完成的数据啦你可以直接使用resp._exitcode NONE;resp._result NONE;switch (req._op){case :resp._result req._x req._y;break;case -:resp._result req._x - req._y;break;case *:resp._result req._x * req._y;break;case /:{if (req._y 0)resp._exitcode DIV_ZERO;elseresp._result req._x / req._y;}break;case %:{if (req._y 0)resp._exitcode MOD_ZERO;elseresp._result req._x % req._y;}break;default:resp._exitcode OP_ERR;break;}return true;
}void calculate(Connection *conn)
{string onepackage;// 从完整报文中取出有效载荷while (handleOnePackage(conn-_inbuffer, onepackage))// 如果是读到一个完整的报文就进入while循环体内对报文进行处理形成响应返回给client端{//去报头string req_str;if(!deLength(onepackage,req_str)){logMessage(FATAL,delength err);return;}cout 有效载荷: req_str endl;// 反序列化用有效载荷去构造req对象Request req;if (!req.Deserialize(req_str)){logMessage(FATAL, Deserialize err);return;}// 用req对象的成员去构造resp对象--处理函数然后构造响应Response resp;cal(req, resp);string respstr;// 用resp对象去序列化出一个报文if (!resp.Serialize(respstr)){logMessage(FATAL, resp serialize err);return;}// 将报文加上报头然后填充到输出缓冲区中conn-_outbuffer enLength(respstr);cout -----result: conn-_outbuffer endl;}//if (conn-_writer)conn-_writer(conn);if (!conn-_outbuffer.empty()) // 如果没有发送完conn-_tcps-EnableReadWrite(conn, true, true);//如果这次数据没发完下次epoll检查就绪事件时当前的sock的写事件还是就绪的那么下次epoll就自动调用写函数继续写elseconn-_tcps-EnableReadWrite(conn, true, false);//通过回调指针调用tcp对象的函数。这次写完了将写事件关闭
}int main(int argc, char *argv[])
{if (argc ! 2){Usage(argv[0]);exit(USAGE_ERR);}uint16_t port atoi(argv[1]);unique_ptrTcpserver selsv(new Tcpserver(calculate, port));selsv-inittcpserver();selsv-Distribute();return 0;
}protocol.hpp #pragma once
#includeiostream
#includestring
#include sys/types.h
#include sys/socket.h
#include cstring
#include jsoncpp/json/json.h
#includelog.hpp
using namespace std;#define SEP
#define SEP_LEN strlen(SEP)//strlen统计\0之前的字符个数而sizeof统计的是所占内存的空间大小使用sizeof会越界出问题
#define LINE_SEP \r\n
#define LINE_SEP_LEN strlen(LINE_SEP)enum {NONE0,DIV_ZERO,MOD_ZERO,OP_ERR
};
//x op y-text_len\r\nx op y\r\n---給内容加上报头
std::string enLength(const std::string text)//协议定制
{std::string send_strto_string(text.size());send_strLINE_SEP;send_strtext;send_strLINE_SEP;return send_str;
}
//text_len\r\nx op y\r\n - x op y---去掉报头取出里面的内容
bool deLength(const std::string str,string* ret)//协议定制
{auto itstr.find(LINE_SEP);//找到报头if(itstd::string::npos) return false;//如果没找到则直接返回int lenstoi(str.substr(0,it));//取出字符串的长度*retstr.substr(itLINE_SEP_LEN,len);//取出数据return true;
}class Request
{
public:
Request():_x(0),_y(0),_op(0){}
Request(int x,int y,int op):_x(x),_y(y),_op(op){}bool Serialize(std::string* out)//序列化将传入的x op y转化为字符串x op y
{*out;*outto_string(_x);*outSEP;*outto_string(_op);*outSEP;*outto_string(_y);return true;
}bool Deserialize( const string origin)//反序列化将传过来的字符串拆出来传参給_x _op _y
{//_xSEP_opSEP_y- _x,_op,_yauto leftitorigin.find(SEP);coutDeserialize找到了leftSEP: leftitendl;auto rightitorigin.rfind(SEP);coutDeserialize找到了rightSEP: rightitendl;if(leftitstring::npos|| rightitstring::npos) return false;if(leftitrightit) return false;int opsizerightit-leftit-1;coutopsize: opsizeendl;
//1 43 1--leftit1,rightit4,opsizerightit-leftit-14-1-12;
//1 3 1--leftit1,right3,opsizerightit-leftit-13-1-11// if(rightit-(leftitSEP_LEN)!1) return false;if(rightit-(leftitSEP_LEN)!opsize) return false;//号ASCII码是43从char转int被解析成43即stringlen为两位,这里的运算rightit-(leftitSEP_LEN)!1就出问题
//4-(11)2;3-(11)1std::string origin_xorigin.substr(0,leftit);std::string origin_yorigin.substr(rightitSEP_LEN);if(origin_x.empty()) return false;if(origin_y.empty()) return false;coutorigin_x: origin_x origin_y: origin_yendl;_xstoi(origin_x);int opfstoi(origin.substr(leftit,rightit));_opopf;coutopf: opf_op: _opendl;_ystoi(origin_y);return true;}public:int _x;int _y;char _op;
};class Response
{
public:
Response():_exitcode(0),_result(0){}
Response(int exitcode,int result):_exitcode(exitcode),_result(result){}
bool Serialize(string*out)//序列化
{//_exitcode _result -_exitcodeSEP_result
*out;
*outto_string(_exitcode);
*outSEP;
*outto_string(_result);return true;
}bool Deserialize(const string in)//反序列化
{//_exitcodeSEP_result-_exitcode _resultauto posin.find(SEP);
if(posstring::npos) return false;string excstrin.substr(0,pos);
string resstrin.substr(posSEP_LEN);
if(excstr.empty()||resstr.empty()) return false;_exitcodestoi(excstr);
_resultstoi(resstr);
return true;}public:
int _exitcode;//退出码
int _result;//结果
};//text_len\r\nx op y\r\n
bool handleOnePackage(string inbuffer,string*out)
{*out;auto posinbuffer.find(LINE_SEP);//找\r\nif(posstring::npos) return false;//没找到报头和有效载荷之间的分隔符---如果字节流式的报文没读全就继续读string text_leninbuffer.substr(0,pos);//报头是有效载荷的长度int lenstoi(text_len);int totallentext_len.size()LINE_SEP_LEN*2len;//整个报文的长度if(inbuffer.size()totallen) {logMessage(WARNING,got uncomplete message);return false;//报文没读完继续读}logMessage(NORMAL,got complete message);*outinbuffer.substr(0,totallen);inbuffer.erase(0,totallen);return true;}log.hpp #pragma once#include iostream
#include string
#includectime
#include sys/types.h#include unistd.h#include stdio.h
#include stdarg.h
using namespace std;
#define DEBUG 0
#define NORMAL 1
#define WARNING 2
#define ERROR 3
#define FATAL 4
#define ERROR_EPOLL_CREATE 5#define NUM 1024
#define LOG_STR ./logstr.txt
#define LOG_ERR ./log.err
const char* to_str(int level)
{switch(level){case DEBUG: return DEBUG;case NORMAL: return NORMAL;case WARNING: return WARNING;case ERROR: return ERROR;case FATAL: return FATAL;case ERROR_EPOLL_CREATE: return ERROR_EPOLL_CREATE;default: return nullptr;}
}void logMessage(int level, const char* format,...)
{char logprestr[NUM];
snprintf(logprestr,sizeof(logprestr),[%s][%ld][%d],to_str(level),(long int)time(nullptr),getpid());char logeldstr[NUM];
va_list arg;
va_start(arg,format);
vsnprintf(logeldstr,sizeof(logeldstr),format,arg);//arg是logmessage函数列表中的...coutlogprestrlogeldstrendl;}Sock.hpp #pragma once#includeiostream
#includestring
#includecstring
#includesys/time.h
#include sys/types.h
#include unistd.h
#include sys/socket.h
#include netinet/in.h
#include arpa/inet.h
#include log.hpp
#include err.hppconst static int backlog32;
const static int defaultsock-1;
class Sock
{const static int backlog32;public:Sock(int sockdefaultsock):_listensock(sock){}void Socket(){_listensocksocket(AF_INET,SOCK_STREAM,0);//创建套接字if(_listensock0)//创建失败{logMessage(FATAL,create sock error);exit(SOCKET_ERR);}//创建成功logMessage(NORMAL,create sock success,origin sock: %d\n,_listensock);int opt1;setsockopt(_listensock,SOL_SOCKET,SO_REUSEADDR|SO_REUSEPORT,opt,sizeof(opt));//允许套接字关闭后立刻重启} void Bind(int port){//绑定自己的网络信息struct sockaddr_in local;memset(local,0,sizeof(local));//将结构体清空local.sin_familyAF_INET;//添加协议local.sin_porthtons(port);//添加端口号local.sin_addr.s_addrhtons(INADDR_ANY);//不绑定指定IP可以接收任意IP主机发送来的数据//将本地设置的信息绑定到网络协议栈if (bind(_listensock,(struct sockaddr*)local,sizeof(local))0){logMessage(FATAL,bind socket error);exit(BIND_ERR);}logMessage(NORMAL,bind socket success);}void Listen()//将套接字设置为监听{if(listen(_listensock,0)0){logMessage(FATAL,listen socket error);exit(LISTEN_ERR);}logMessage(NORMAL,listen socket success);}int Accpet(string * clientip,uint16_t* clientport,int*err){logMessage(DEBUG,enter Accept);*errerrno;struct sockaddr_in cli;socklen_t len sizeof(cli);logMessage(DEBUG,will accept);//拿上来连接后第二次调用到这里调用accept函数阻塞住了难道不是设定了sock是非阻塞了吗11.17.21.24int sockaccept(_listensock,(struct sockaddr*)cli,len);logMessage(DEBUG,accept done);if(sock0){logMessage(FATAL,accept error);//这里accept失败为什么不退出}else{logMessage(NORMAL,accept a new link,get new sock : %d,sock);*clientipinet_ntoa(cli.sin_addr);*clientportntohs(cli.sin_port);}logMessage(DEBUG,quit Accept);return sock;}int Fd(){return _listensock;}private:int _listensock;
};until.hpp #pragma once#includeiostream
#includeunistd.h
#includefcntl.h
#includestring.h
#includecerrno
using namespace std;
class Until
{
public:
static void setNonBlock(int fd)//把文件描述符设置为非阻塞
{int nfcntl(fd,F_GETFL);//获取文件描述符的状态正常返回非-1的标志位出错返回-1if(n0){cerrfcntl :strerror(errno)endl;return ;}fcntl(fd,F_SETFL,n|O_NONBLOCK);//对文件描述符的状态进行设置设置为非阻塞状态
}};void Print_log()
{coutprint_logendl;
}void Download()
{coutdownload_somethingendl;
}Epoller.hpp #pragma once#include iostream
#include sys/select.h
#include string
#include functional
#include sys/epoll.h
#include Sock.hppusing namespace std;
static const int defaultfd -1; // 默认fd
static const int size 128; //
class Epoller
{
public:Epoller(int fd defaultfd) : _epfd(fd) {}~Epoller(){if (_epfd ! defaultfd){close(_epfd);}}void Create(){logMessage(DEBUG, enter epoller create);_epfd epoll_create(size); // 将管理事件数传进去创建一个具有指定事件数的epoll模型if (_epfd 0){logMessage(ERROR, epoll_create error);exit(ERROR_EPOLL_CREATE);}logMessage(DEBUG, out epoller create,_epfd: %d\n, _epfd);}// 用户告知内核需要底层监管那些事件bool Add_Event(int sock, uint16_t event) // 将sock和event添加到epoll模型中{struct epoll_event epv;epv.events event;epv.data.fd sock;int n epoll_ctl(_epfd, EPOLL_CTL_ADD, sock, epv);return n 0;}// 内核告诉用户已经就绪了多少个事件int Wait(struct epoll_event revent[], int num, int timeout) // 监管num个事件就绪事件保存在revent数组中{logMessage(DEBUG, enter epoll_Wait);int n epoll_wait(_epfd, revent, num, timeout);logMessage(DEBUG, quit epoll_Wait,get n: %d, n);// sleep(2);return n; // 内核帮助用户监管事件返回已经就绪的事件数量}bool Control(int sock, uint32_t event, int action){struct epoll_event epv;epv.events event;epv.data.fd sock;int n 0;if (action EPOLL_CTL_MOD){logMessage(NORMAL,enter Control MOD);n epoll_ctl(_epfd, action, sock, epv);}else if (action EPOLL_CTL_DEL){n epoll_ctl(_epfd, action, sock, nullptr);}elsen -1;return n 0;}void Close(){if (_epfd ! defaultfd){close(_epfd);}}private:int _epfd;
};由于该reactor处理的是类似于接收“11”的完整报文的数据返回客户端所得数的业务因此客户端也需要能够具备发送完整报文的能力。 calclient.cc #includeiostream
#includestring
#includememory
#includecalclient.hpp
using namespace std;
using namespace client;
static void Usage(string proc)
{cout\nUsage :\n\tproc serverip serverport\nendl;
}
int main(int argc, char* argv[])
{if(argc!3){Usage(argv[0]);exit(1);}string serveripargv[1];
uint16_t serverportatoi(argv[2]);unique_ptrcalclient tc(new calclient(serverip,serverport));tc-initclient();
tc-start();return 0;
}calclient.hpp #pragma once
#include iostream
#include string
#include cstring
#include sys/types.h
#include sys/socket.h
#include netinet/in.h
#include arpa/inet.h
#include unistd.h
#include ctype.h
#includeprotocol.hpp
using namespace std;
#define NUM 1024
namespace client
{class calclient
{public:
calclient(const string ip,const uint16_t port)
:_sock(-1)
,_port(port)
,_ip(ip)
{}void initclient()
{
//1.创建sockfd
_socksocket(AF_INET,SOCK_STREAM,0);
if(_sock0)
{cerrsocket create errorendl;exit(2);
}
//2.绑定 ip port不显示绑定OS自动绑定
}void start()
{
struct sockaddr_in ser;
bzero(ser,sizeof(ser));
socklen_t lensizeof(ser);
ser.sin_familyAF_INET;
ser.sin_porthtons(_port);
ser.sin_addr.s_addrinet_addr(_ip.c_str());
if(connect(_sock,(struct sockaddr *)ser,len)!0)
{cerrconnect errorendl;
}else
{string line;string inbuffer;while(true){coutmycal: ;//输入xopygetline(cin,line);Request reqParseLine(line);//用xopy取出x op y构造Request对象string context;req.Serialize(context);//序列化用x op y构造字符串xSEPopSEPystring send_strenLength(context);//定制协议---x op y-text_len\r\nx op y\r\n---給内容加上报头coutcalclient send str: send_strendl;send(_sock,send_str.c_str(),send_str.size(),0);//客户端把报文发送給服务器string package;if(!recvPackage(_sock,inbuffer,package)) continue;//服务器处理完数据客户端接收服务器发送来的报文// content_len\r\nexitcode result\r\nstring reser_len;if(!deLength(package,reser_len)) continue;//去报头// content_len\r\nexitcode result\r\n - exitcode resultResponse rep;rep.Deserialize(reser_len);//反序列化//_exitcodeSEP_result-_exitcode _resultcout_exitcode: rep._exitcodeendl;cout_result: rep._resultendl;}
}
}~calclient()
{if(_sock0) close(_sock);
}Request ParseLine(const string line)
{//xopy-取出来到x op y 上
int i0;
int status0;
int numline.size();
string left,right;
char op;while(inum)
{
switch(status)
{case 0:{if(!isdigit(line[i])){opline[i];//取出运算符**status1;}elseleft.push_back(line[i]);//取出左操作数}break;case 1:i;status2;break;case 2:right.push_back(line[i]);break;
}
}
coutleft: stoi(left) op: op right: stoi(right)endl;
return Request(stoi(left),stoi(right),op);//返回Request对象}private:
int _sock;
uint16_t _port;
string _ip;};
} reactor保证了事件就绪还负责了IO并且还完成了业务处理。负责了IO过程业务处理该过程称为半同步。实际上可以将业务放到其他处理逻辑上只负责IO过程这称为半异步。