dw学生个人网页制作视频,旺道seo网站优化大师,郴州网页设计招聘,深圳网站建设定制开发 超凡科技这里只展示了server端#xff0c;client端可以用之前的poll写的。 每个client我们fork一个子进程用epoll来实现它的I/O复用。 非常巧妙的使用共享内存#xff0c;通过给每个client编号以及BUFFER_SIZE保存需要广播和接受的内容#xff0c;因为有了编号#xff0c;所以父子进…这里只展示了server端client端可以用之前的poll写的。 每个client我们fork一个子进程用epoll来实现它的I/O复用。 非常巧妙的使用共享内存通过给每个client编号以及BUFFER_SIZE保存需要广播和接受的内容因为有了编号所以父子进程的socketpair通信我们只要传编号就可以表示这个client需要广播的内容了。 最后就是里面注册的信号量父进程不能直接说关闭就关闭否则没有及时关闭的子进程会变成僵尸进程所以我们通过注册的信号量来让系统走我们把所以子进程都关闭再关闭自己的逻辑。
缺点 当一部分client频繁发送时容易出现所处的共享内存上的buffer未发出但是新的又来了所以可能会导致吞消息的现象这时我们需要设置缓冲区来解决下次一定。
#include sys/stat.h
#include sys/socket.h
#include sys/wait.h
#include sys/mman.h
#include netinet/in.h
#include arpa/inet.h
#include assert.h
#include stdio.h
#include set
#include unistd.h
#include errno.h
#include string.h
#include fcntl.h
#include stdlib.h
#include sys/epoll.h
#include sys/shm.h
#include signal.h#define USER_LIMIT 5 //最大用户数量
#define BUFFER_SIZE 1024 //读缓冲区的大小
#define FD_LIMIT 65535 //文件描述符数量限制
#define MAX_EVENT_NUMBER 1024
#define PROCESS_LIMIT 6553665536
//客户数据 socket地址、待写到客户端的数据的位置、从客户端读入的数据
struct client_data
{sockaddr_in address;int connfd; ///socket文件描述符pid_t pid; //处理这个连接的子进程pidint pipefd[2]; //与父进程通信的管道
};
static const char* shm_name /my_shm;
int sig_pipefd[2];
int epollfd;
int listenfd;
int shmfd;
char* share_men 0;
//客户连接的数组进程用客户连接编号来索引获得相关的数据
client_data* users 0;
//子进程和客户的连接映射关系表用进程的pid来索引数据获取该进程处理的客户连接编号
int* sub_process 0;
int user_counet 0 ;
std::setint nost;
//当前客户数量
std::setint curst;
bool stop_child false;int setnoblocking(int fd)
{int old_option fcntl(fd,F_GETFL);int new_option old_option | O_NONBLOCK;fcntl(fd,F_SETFL,new_option);return old_option;
}
void addfd(int epollfd,int fd)
{epoll_event event ;event.data.fd fd ;event.events EPOLLIN | EPOLLET;epoll_ctl(epollfd,EPOLL_CTL_ADD,fd,event);setnoblocking(fd);
}
void sig_handler(int sig)
{int save_errno errno;int msg sig;send(sig_pipefd[1],(char *)msg,1,0);errno save_errno;
}
void addsig(int sig , void(*handler)(int) ,bool restart true )
{struct sigaction sa ;memset(sa,\0,sizeof(sa));sa.sa_handler handler;if ( restart )sa.sa_flags | SA_RESTART;sigfillset(sa.sa_mask);assert(sigaction(sig,sa,NULL) ! 1 );
}
void del_resource()
{close(sig_pipefd[0]);close(sig_pipefd[1]);close(listenfd);close(epollfd);shm_unlink(shm_name);delete [] users;delete sub_process;
}
void child_term_handler(int sig)
{stop_child true;
}
//子进程运行的函数参数idx指出该子进程处理的客户连接的编号user是保存所有客户连接数据的数组参数share_men指出共享内存的起始位置
int run_child(int idx,client_data* users,char* share_mem)
{epoll_event events[MAX_EVENT_NUMBER];//子进程使用I/O复用技术来同时监控两个文件描述符:客户连接socket、与父进程通信的管道文件描述符int child_epollfd epoll_create(5);assert(child_epollfd ! -1);int connfd users[idx].connfd;addfd(child_epollfd,connfd);int pipefd users[idx].pipefd[1];addfd(child_epollfd,pipefd);int ret;//子进程设置自己的信号处理函数addsig(SIGTERM,child_term_handler,false);while ( !stop_child ){int number epoll_wait(child_epollfd, events, MAX_EVENT_NUMBER, -1);if ((number 0) (errno ! EINTR)){printf(epoll failure\n);break;}for (int i 0 ; i number ; i ){int sockfd events[i].data.fd;//本子进程负责的客户连接有数据到达if ( (sockfd connfd ) ( events[i].events EPOLLIN) ){memset(share_mem idx*BUFFER_SIZE ,\0,BUFFER_SIZE);//将客户数据读取到对应的读缓存中该读缓存时共享内存的一段ret recv(connfd,share_memidx*BUFFER_SIZE,BUFFER_SIZE-1,0);if ( ret 0 ){if ( errno ! EAGAIN ){stop_child true;}}else if ( ret 0 ){stop_child true;}else{send(pipefd,(char *)idx,sizeof(idx),0);}}else if ( (sockfd pipefd) (events[i].events EPOLLIN) ){int client 0 ;//接受主进程发来的数据ret recv(sockfd,(char *)client , sizeof(client),0);if ( ret 0 ){if ( errno ! EAGAIN ){stop_child true;}}else if ( ret 0 ){stop_child true;}else{send(connfd,share_memclient*BUFFER_SIZE,BUFFER_SIZE,0);}}}}close(connfd);close(pipefd);close(child_epollfd);return 0;
}
int main()
{const char* ip 192.168.174.129 ;int port 5050 ;int ret 0;sockaddr_in address;bzero(address,sizeof(address));address.sin_family AF_INET;inet_pton(AF_INET,ip,address.sin_addr);address.sin_port htons(port);listenfd socket(PF_INET,SOCK_STREAM,0);assert(listenfd 0 );ret bind(listenfd,(struct sockaddr *)address,sizeof(address));assert(ret ! -1);ret listen(listenfd,5);assert(ret ! -1);user_counet 0;users new client_data [USER_LIMIT1];sub_process new int[PROCESS_LIMIT];for (int i 1 ; i PROCESS_LIMIT ; i )sub_process[i] -1;for (int i 0 ; i USER_LIMIT ; i )nost.insert(i);epoll_event ev;epoll_event events[MAX_EVENT_NUMBER];epollfd epoll_create(USER_LIMIT);assert(epollfd ! -1);addfd(epollfd,listenfd);ret socketpair(PF_UNIX,SOCK_STREAM,0,sig_pipefd);assert(ret ! -1);setnoblocking(sig_pipefd[1]);addfd(epollfd,sig_pipefd[0]);addsig(SIGCLD,sig_handler);addsig(SIGTERM,sig_handler);addsig(SIGINT,sig_handler);addsig(SIGPIPE,SIG_IGN);bool stop_server false;bool terminate false;//创建共享内存作为客户socket连接的读缓存shmfd shm_open(shm_name,O_CREAT|O_RDWR,0666);assert(shmfd ! -1);ret ftruncate(shmfd,USER_LIMIT*BUFFER_SIZE);assert(ret ! -1);share_men (char *) mmap(NULL,USER_LIMIT*BUFFER_SIZE,PROT_READ|PROT_WRITE,MAP_SHARED,shmfd,0);assert(share_men ! MAP_FAILED);close(shmfd);while ( !stop_server ){int number epoll_wait(epollfd,events,MAX_EVENT_NUMBER,-1);if ( (number 0 ) ( errno ! EINTR ) ){printf(epoll falure\n);break;}for (int i 0 ; i number; i ){int sockfd events[i].data.fd;if ( sockfd listenfd ){struct sockaddr_in client_address;socklen_t client_addrlength sizeof(client_address);int connfd accept(listenfd,(struct sockaddr*)client_address,client_addrlength);if ( connfd 0 ) {printf(errno is : %d \n, errno);continue;}if ( curst.size() USER_LIMIT ){const char * info too many users\n;printf(%s,info);send(connfd,info, strlen(info),0);close(connfd);continue;}user_counet *nost.begin();nost.erase(nost.begin());curst.insert(user_counet);users[user_counet].address client_address;users[user_counet].connfd connfd;ret socketpair(PF_UNIX,SOCK_STREAM,0,users[user_counet].pipefd);assert(ret ! -1);pid_t pid fork();if ( pid 0 ){close(connfd);continue;}else if ( pid 0 ){close(epollfd);close(listenfd);close(users[user_counet].pipefd[0]);close(sig_pipefd[0]);close(sig_pipefd[1]);run_child(user_counet,users,share_men);munmap((void*)share_men,USER_LIMIT*BUFFER_SIZE);exit(0);}else{close(connfd);close(users[user_counet].pipefd[1]);addfd(epollfd,users[user_counet].pipefd[0]);users[user_counet].pid pid;printf(client %d join , now curclient %d \n,user_counet,curst.size());sub_process[pid] user_counet;}}else if ( (sockfd sig_pipefd[0]) (events[i].events EPOLLIN)){int sig;char signals[1024];printf(recv sig !!!\n);ret recv(sig_pipefd[0],signals,sizeof(signals),0);if ( ret -1 ){continue;}else if ( ret 0 ){continue;}else{for (int k 0 ; k ret; k ){switch (signals[k]){case SIGCHLD:{pid_t pid;int stat;while ( (pid waitpid(-1,stat,WNOHANG)) 0 ){//用子进程的pid取需要关闭的客户连接idint del_user sub_process[pid];sub_process[pid] -1;if ( ( del_user 0 ) || ( del_user USER_LIMIT ) ){continue;}printf(close : %d \n,del_user);nost.insert(del_user);curst.erase(del_user);//清除第del_user个客户端连接使用的相关数据epoll_ctl(epollfd,EPOLL_CTL_DEL,users[del_user].pipefd[0],0);close(users[del_user].pipefd[0]);users[del_user] users[USER_LIMIT];sub_process[users[del_user].pid] del_user;}if ( terminate curst.empty() ){stop_server true;}break;}case SIGTERM:case SIGINT:{//结束服务器程序printf(kill all the child now\n);if ( curst.empty() ){stop_server true ;break;}for (auto j : curst){int pid users[j].pid;kill(pid,SIGTERM);}terminate true ;break;}default:{break;}}}}}//某个子进程向父进程写了数据else if ( events[i].events EPOLLIN ){int child 0;//读取管道数据child变量记录了是哪个客户连接有数据可达ret recv(sockfd,(char *)child,sizeof(child),0);//printf(read data from child accross pipe\n);if ( ret -1 ){continue;}else if ( ret 0 ){continue;}else{printf(read data from child accross pipe %s \n,share_menchild*BUFFER_SIZE);//向除了负责第child个客户连接的子进程之外的其他进程发送消息通知他们的客户端有数据要写for (auto j : curst){if ( users[j].pipefd[0] ! sockfd ){printf(send data to child accross pipe\n);send(users[j].pipefd[0],(char* )child,sizeof(child),0);}}}}}}del_resource();return 0;
}