现在有这么一个场景:我是一个很忙的大老板,我有100个手机,手机来信息了,我的秘书就会告诉我“老板,你的手机来信息了。”我很生气,我的秘书就是这样子,每次手机来信息就只告诉我来信息了,老板赶紧去看。但是她从来不把话说清楚:到底是哪个手机来信息啊!我可有100个手机啊!于是,我只能一个一个手机去查看,来确定到底是哪几个手机来信息了。这就是io复用中select模型的缺点!老板心想,要是秘书能把来信息的手机直接拿到我桌子上就好了,那么我的效率肯定大增(这就是epoll模型)。
那我们先来总结一下select模型的缺点:
单个进程能够监视的文件描述符的数量存在最大限制,通常是1024,当然可以更改数量,但由于select采用轮询的方式扫描文件描述符,文件描述符数量越多,性能越差;(在linux内核头文件中,有这样的定义:#define __fd_setsize 1024)
内核 / 用户空间内存拷贝问题,select需要复制大量的句柄数据结构,产生巨大的开销;
select返回的是含有整个句柄的数组,应用程序需要遍历整个数组才能发现哪些句柄发生了事件;
select的触发方式是水平触发,应用程序如果没有完成对一个已经就绪的文件描述符进行io操作,那么之后每次select调用还是会将这些文件描述符通知进程。
设想一下如下场景:有100万个客户端同时与一个服务器进程保持着tcp连接。而每一时刻,通常只有几百上千个tcp连接是活跃的(事实上大部分场景都是这种情况)。如何实现这样的高并发?
粗略计算一下,一个进程最多有1024个文件描述符,那么我们需要开1000个进程来处理100万个客户连接。如果我们使用select模型,这1000个进程里某一段时间内只有数个客户连接需要数据的接收,那么我们就不得不轮询1024个文件描述符以确定究竟是哪个客户有数据可读,想想如果1000个进程都有类似的行为,那系统资源消耗可有多大啊!
针对select模型的缺点,epoll模型被提出来了!
epoll模型的优点
支持一个进程打开大数目的socket描述符
io效率不随fd数目增加而线性下降
使用mmap加速内核与用户空间的消息传递
epoll的两种工作模式
lt(level triggered,水平触发模式)是缺省的工作方式,并且同时支持 block 和 non-block socket。在这种做法中,内核告诉你一个文件描述符是否就绪了,然后你可以对这个就绪的fd进行io操作。如果你不作任何操作,内核还是会继续通知你的,所以,这种模式编程出错误可能性要小一点。比如内核通知你其中一个fd可以读数据了,你赶紧去读。你还是懒懒散散,不去读这个数据,下一次循环的时候内核发现你还没读刚才的数据,就又通知你赶紧把刚才的数据读了。这种机制可以比较好的保证每个数据用户都处理掉了。
et(edge-triggered,边缘触发模式)是高速工作方式,只支持no-block socket。在这种模式下,当描述符从未就绪变为就绪时,内核通过epoll告诉你。然后它会假设你知道文件描述符已经就绪,并且不会再为那个文件描述符发送更多的就绪通知,等到下次有新的数据进来的时候才会再次出发就绪事件。简而言之,就是内核通知过的事情不会再说第二遍,数据错过没读,你自己负责。这种机制确实速度提高了,但是风险相伴而行。
epoll模型api
#include /* 创建一个epoll的句柄,size用来告诉内核需要监听的数目一共有多大。当创建好epoll句柄后,它就是会占用一个fd值,所以在使用完epoll后,必须调用close()关闭,否则可能导致fd被耗尽。*/int epoll_create(int size); /*epoll的事件注册函数*/int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event); /*等待事件的到来,如果检测到事件,就将所有就绪的事件从内核事件表中复制到它的第二个参数events指向的数组*/int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);
epoll的事件注册函数epoll_ctl,第一个参数是 epoll_create() 的返回值,第二个参数表示动作,使用如下三个宏来表示:
poll_ctl_add //注册新的fd到epfd中;epoll_ctl_mod //修改已经注册的fd的监听事件;epoll_ctl_del //从epfd中删除一个fd;
struct epoll_event 结构如下:
typedef union epoll_data{ void *ptr; int fd; __uint32_t u32; __uint64_t u64;} epoll_data_t;struct epoll_event { __uint32_t events; /* epoll events */ epoll_data_t data; /* user data variable */};
epoll_event结构体中的events 可以是以下几个宏的集合:
epollin //表示对应的文件描述符可以读(包括对端socket正常关闭);epollout //表示对应的文件描述符可以写;epollpri //表示对应的文件描述符有紧急的数据可读(这里应该表示有带外数据到来);epollerr //表示对应的文件描述符发生错误;epollhup //表示对应的文件描述符被挂断;epollet //将epoll设为边缘触发(edge triggered)模式,这是相对于水平触发(level triggered)来说的。epolloneshot//只监听一次事件,当监听完这次事件之后,如果还需要继续监听这个socket的话,需要再次把这个socket加入到epoll队列里。
epoll的一个简单使用范例
#include #include #include #include #include #include #include #include #include #include #define maxline 5#define open_max 100#define listenq 20#define serv_port 5000#define inftim 1000void setnonblocking(int sock){ int opts; opts=fcntl(sock,f_getfl); if(opts<0) { perror(fcntl(sock,getfl)); exit(1); } opts = opts|o_nonblock; if(fcntl(sock,f_setfl,opts)<0) { perror(fcntl(sock,setfl,opts)); exit(1); }}int main(int argc, char* argv[]){ int i, maxi, listenfd, connfd, sockfd,epfd,nfds, portnumber; ssize_t n; char line[maxline]; socklen_t clilen; if ( 2 == argc ) { if( (portnumber = atoi(argv[1])) < 0 ) { fprintf(stderr,usage:%s portnumber/a/n,argv[0]); return 1; } } else { fprintf(stderr,usage:%s portnumber/a/n,argv[0]); return 1; } //声明epoll_event结构体的变量,ev用于注册事件,数组用于回传要处理的事件 struct epoll_event ev,events[20]; //生成用于处理accept的epoll专用的文件描述符 epfd=epoll_create(256); struct sockaddr_in clientaddr; struct sockaddr_in serveraddr; listenfd = socket(af_inet, sock_stream, 0); //把socket设置为非阻塞方式 //setnonblocking(listenfd); //设置与要处理的事件相关的文件描述符 ev.data.fd=listenfd; //设置要处理的事件类型 ev.events=epollin|epollet; //ev.events=epollin; //注册epoll事件 epoll_ctl(epfd,epoll_ctl_add,listenfd,&ev); bzero(&serveraddr, sizeof(serveraddr)); serveraddr.sin_family = af_inet; char *local_addr=127.0.0.1; inet_aton(local_addr,&(serveraddr.sin_addr));//htons(portnumber); serveraddr.sin_port=htons(portnumber); bind(listenfd,(struct sockaddr *)&serveraddr, sizeof(serveraddr)); listen(listenfd, listenq); maxi = 0; for ( ; ; ) { //等待epoll事件的发生 nfds=epoll_wait(epfd,events,20,500); //处理所发生的所有事件 for(i=0;iif(events[i].data.fd==listenfd)//如果新监测到一个socket用户连接到了绑定的socket端口,建立新的连接。 { connfd = accept(listenfd,(struct sockaddr *)&clientaddr, &clilen); if(connfd<0){ perror(connfd<0); exit(1); } //setnonblocking(connfd); char *str = inet_ntoa(clientaddr.sin_addr); printf(accapt a connection from\n ); //设置用于读操作的文件描述符 ev.data.fd=connfd; //设置用于注测的读操作事件 ev.events=epollin|epollet; //ev.events=epollin; //注册ev epoll_ctl(epfd,epoll_ctl_add,connfd,&ev); } else if(events[i].events&epollin)//如果是已经连接的用户,并且收到数据,那么进行读入。 { printf(epollin\n); if ( (sockfd = events[i].data.fd) < 0) continue; if ( (n = read(sockfd, line, maxline)) < 0) { if (errno == econnreset) { close(sockfd); events[i].data.fd = -1; } else printf(readline error\n); } else if (n == 0) { close(sockfd); events[i].data.fd = -1; } if(n-2) line[n] = '\0'; //设置用于写操作的文件描述符 ev.data.fd=sockfd; //设置用于注测的写操作事件 ev.events=epollout|epollet; //修改sockfd上要处理的事件为epollout //epoll_ctl(epfd,epoll_ctl_mod,sockfd,&ev); } else if(events[i].events&epollout) // 如果有数据发送 { sockfd = events[i].data.fd; write(sockfd, line, n); //设置用于读操作的文件描述符 ev.data.fd=sockfd; //设置用于注测的读操作事件 ev.events=epollin|epollet; //修改sockfd上要处理的事件为epolin epoll_ctl(epfd,epoll_ctl_mod,sockfd,&ev); } } } return 0;}
带et和lt双模式的epoll服务器
#include #include #include #include #include #include #include #include #include #include #include #include #include #define max_event_number 1024 //event的最大数量#define buffer_size 10 //缓冲区大小#define enable_et 1 //是否启用et模式/* 将文件描述符设置为非拥塞的 */int setnonblocking(int fd){ int old_option = fcntl(fd, f_getfl); int new_option = old_option | o_nonblock; fcntl(fd, f_setfl, new_option); return old_option;}/* 将文件描述符fd上的epollin注册到epoll_fd指示的epoll内核事件表中,参数enable_et指定是否对fd启用et模式 */void addfd(int epoll_fd, int fd, bool enable_et){ struct epoll_event event; event.data.fd = fd; event.events = epollin; //注册该fd是可读的 if(enable_et) { event.events |= epollet; } epoll_ctl(epoll_fd, epoll_ctl_add, fd, &event); //向epoll内核事件表注册该fd setnonblocking(fd);}/* lt工作模式特点:稳健但效率低 */void lt_process(struct epoll_event* events, int number, int epoll_fd, int listen_fd){ char buf[buffer_size]; int i; for(i = 0; i < number; i++) //number: 就绪的事件数目 { int sockfd = events[i].data.fd; if(sockfd == listen_fd) //如果是listen的文件描述符,表明有新的客户连接到来 { struct sockaddr_in client_address; socklen_t client_addrlength = sizeof(client_address); int connfd = accept(listen_fd, (struct sockaddr*)&client_address, &client_addrlength); addfd(epoll_fd, connfd, false); //将新的客户连接fd注册到epoll事件表,使用lt模式 } else if(events[i].events & epollin) //有客户端数据可读 { // 只要缓冲区的数据还没读完,这段代码就会被触发。这就是lt模式的特点:反复通知,直至处理完成 printf(lt mode: event trigger once!\n); memset(buf, 0, buffer_size); int ret = recv(sockfd, buf, buffer_size - 1, 0); if(ret <= 0) //读完数据了,记得关闭fd { close(sockfd); continue; } printf(get %d bytes of content: %s\n, ret, buf); } else { printf(something unexpected happened!\n); } }}/* et工作模式特点:高效但潜在危险 */void et_process(struct epoll_event* events, int number, int epoll_fd, int listen_fd){ char buf[buffer_size]; int i; for(i = 0; i < number; i++) { int sockfd = events[i].data.fd; if(sockfd == listen_fd) { struct sockaddr_in client_address; socklen_t client_addrlength = sizeof(client_address); int connfd = accept(listen_fd, (struct sockaddr*)&client_address, &client_addrlength); addfd(epoll_fd, connfd, true); //使用et模式 } else if(events[i].events & epollin) { /* 这段代码不会被重复触发,所以我么循环读取数据,以确保把socket读缓存的所有数据读出。这就是我们消除et模式潜在危险的手段 */ printf(et mode: event trigger once!\n); while(1) { memset(buf, 0, buffer_size); int ret = recv(sockfd, buf, buffer_size - 1, 0); if(ret < 0) { /* 对于非拥塞的io,下面的条件成立表示数据已经全部读取完毕,此后epoll就能再次触发sockfd上的epollin事件,以驱动下一次读操作 */ if(errno == eagain || errno == ewouldblock) { printf(read later!\n); break; } close(sockfd); break; } else if(ret == 0) { close(sockfd); } else //没读完,继续循环读取 { printf(get %d bytes of content: %s\n, ret, buf); } } } else { printf(something unexpected happened!\n); } }}int main(int argc, char* argv[]){ if(argc <= 2) { printf(usage: ip_address + port_number\n); return -1; } const char* ip = argv[1]; int port = atoi(argv[2]); int ret = -1; struct sockaddr_in address; bzero(&address, sizeof(address)); address.sin_family = af_inet; inet_pton(af_inet, ip, &address.sin_addr); address.sin_port = htons(port); int listen_fd = socket(pf_inet, sock_stream, 0); if(listen_fd < 0) { printf(fail to create socket!\n); return -1; } ret = bind(listen_fd, (struct sockaddr*)&address, sizeof(address)); if(ret == -1) { printf(fail to bind socket!\n); return -1; } ret = listen(listen_fd, 5); if(ret == -1) { printf(fail to listen socket!\n); return -1; } struct epoll_event events[max_event_number]; int epoll_fd = epoll_create(5); //事件表大小为5 if(epoll_fd == -1) { printf(fail to create epoll!\n); return -1; } addfd(epoll_fd, listen_fd, true); //使用et模式epoll,将listen文件描述符加入事件表 while(1) { int ret = epoll_wait(epoll_fd, events, max_event_number, -1); if(ret sockfd; int epollfd = ((struct fds*)arg)->epollfd; printf(start new thread to receive data on fd: %d\n, sockfd); char buf[buffer_size]; memset(buf, 0, buffer_size); while(1) { int ret = recv(sockfd, buf,buffer_size-1, 0); if(ret == 0) { close(sockfd); printf(foreigner closed the connection\n); break; } else if(ret < 0) { if(errno = eagain) { reset_oneshot(epollfd, sockfd); printf(read later\n); break; } } else { printf(get content: %s\n, buf); //休眠5秒,模拟数据处理过程 printf(worker working...\n); sleep(5); } } printf(end thread receiving data on fd: %d\n, sockfd);}int main(int argc, char* argv[]){ if(argc <= 2) { printf(usage: ip_address + port_number\n); return -1; } const char* ip = argv[1]; int port = atoi(argv[2]); int ret = -1; struct sockaddr_in address; bzero(&address, sizeof(address)); address.sin_family = af_inet; inet_pton(af_inet, ip, &address.sin_addr); address.sin_port = htons(port); int listenfd = socket(pf_inet, sock_stream, 0); if(listenfd < 0) { printf(fail to create socket!\n); return -1; } ret = bind(listenfd, (struct sockaddr*)&address, sizeof(address)); if(ret == -1) { printf(fail to bind socket!\n); return -1; } ret = listen(listenfd, 5); if(ret == -1) { printf(fail to listen socket\n); return -1; } struct epoll_event events[max_event_number]; int epollfd = epoll_create(5); if(epollfd == -1) { printf(fail to create epoll\n); return -1; } //注意,监听socket listenfd上是不能注册epolloneshot事件的,否则应用程序只能处理一个客户连接!因为后续的客户连接请求将不再触发listenfd的epollin事件 addfd(epollfd, listenfd, false); while(1) { int ret = epoll_wait(epollfd, events, max_event_number, -1); //永久等待 if(ret < 0) { printf(epoll failure!\n); break; } int i; for(i = 0; i < ret; i++) { int sockfd = events[i].data.fd; if(sockfd == listenfd) { struct sockaddr_in client_address; socklen_t client_addrlength = sizeof(client_address); int connfd = accept(listenfd, (struct sockaddr*)&client_address, &client_addrlength); //对每个非监听文件描述符都注册epolloneshot事件 addfd(epollfd, connfd, true); } else if(events[i].events & epollin) { pthread_t thread; struct fds fds_for_new_worker; fds_for_new_worker.epollfd = epollfd; fds_for_new_worker.sockfd = events[i].data.fd; /*新启动一个工作线程为sockfd服务*/ pthread_create(&thread, null, worker, &fds_for_new_worker); } else { printf(something unexpected happened!\n); } } } close(listenfd); return 0;}
epolloneshot模式现象分析:我们继续使用上面的tcp客户端来测试,需要修改一下客户端的sleep时间改为3秒。工作流程就是:客户端第一次发送数据时服务器的接收缓冲区是有足够空间的,然后服务器的工作线程进入5秒的处理数据阶段;3秒后客户端继续发送新数据过来,但是工作线程还在处理数据,没办法立即接收新的数据。2秒后,客户端该线程数据处理完了,开始接收新的数据。可以观察到,我们客户端只使用了同一个线程去处理同一个客户端的请求,符合预期。
瓷介电容器的基本性能参数简单介绍
话筒和扬声器的比较以及压电陶瓷电声转换
我国正式启动原初引力波探测实验:听宇宙第一声啼哭
更改KUKA机器人系统时间的操作步骤
FAA 批准第一批全自动商业无人机飞行
你需要了解Linux编程的epoll
基于LVDS的超高速ADC数据接收设计
新型过滤技术或缩短提取锂的时间
微雪电子树莓派3计算模块精简版介绍
三星携新机强势来袭,1亿像素+60倍变焦+高通骁龙865
热点新闻:荣耀畅玩8A曝光 魅族Note 9搭载骁龙6150平台
迅雷链正式推出了WASM虚拟机模块的源代码和代码库
华为mate9被赞史上最佳手机,外媒给出高度评价和支持!
工业相机对连接器的要求如此严苛?凌科这款Type-C航空插头凭不俗优势备受关注
A12XBionic性能超92%笔记本电脑处理器 或与英特尔分手
WLAN的频段介绍,如何更好地利用2.4GHz和5GHz
中国电信天翼云:打造安全云 为数字中国建设保驾护航
采用新波形技术的LTE系统可以满足未来5G业务需求?
如何从其ADC通道读取模拟输入信号
Ansys宣布通过GlobalFoundries面向GF 22FDX®平台的半导体工具认证