poll运行效率的两个瓶颈已经找出,现在的问题是怎么改进。首先,如果要监听1000个fd,每次poll都要把1000个fd拷入内核,太不科学了,内核干嘛不自己保存已经拷入的fd呢?答对了,epoll就是自己保存拷入的fd,它的api就已经说明了这一点——不是epoll_wait的时候才传入fd,而是通过epoll_ctl把所有fd传入内核再一起wait,这就省掉了不必要的重复拷贝。其次,在epoll_wait时,也不是把current轮流的加入fd对应的设备等待队列,而是在设备等待队列醒来时调用一个回调函数(当然,这就需要“唤醒回调”机制),把产生事件的fd归入一个链表,然后返回这个链表上的fd。
另外,epoll机制实现了自己特有的文件系统eventpollfilesystem
1.内核数据结构
(1)structeventpoll{
spinlock_tlock;
structmutexmtx;
wait_queue_head_twq;/*waitqueueusedbysys_epoll_wait(),调用epoll_wait()时,我们就是睡在了这个等待队列上*/
wait_queue_head_tpoll_wait;/*waitqueueusedbyfile->poll(),这个用于epollfd本事被poll的时候*/
structlist_headrdllist;/*listofreadyfiledescriptors,所有已经ready的epitem都在这个链表里面*/
structrb_rootrbr;/*rbtreerootusedtostoremonitoredfdstructs,所有要监听的epitem都在这里*/
epitem*ovflist;/*存放的epitem都是我们在传递数据给用户空间时监听到了事件*/.
structuser_struct*user;/*这里保存了一些用户变量,比如fd监听数量的最大值等*/
};
通过epoll_ctl接口加入该epoll描述符监听的套接字则属于socketfilesystem,这点一定要注意。每个添加的待监听(这里监听和listen调用不同)都对应于一个epitem结构体,该结构体已红黑树的结构组织,eventpoll结构中保存了树的根节点(rbr成员)。同时有监听事件到来的套接字的该结构以双向链表组织起来,链表头保存在eventpoll中(rdllist成员)。
/*
*eachfiledescriptoraddedtotheeventpollinterfacewillhaveanentryofthistypelinkedtotherbrrbtree.
*/
(2)structepitem{
structrb_noderbn;/*rbtreenodeusedtolinkthisstructuretotheeventpollrbtree*/
structlist_headrdllink;/*链表节点,所有已经ready的epitem都会被链到eventpoll的rdllist中*/
structepitem*next;
structepoll_filefdffd;/*thefiledescriptorinformationthisitemrefersto*/
intnwait;/*numberofactivewaitqueueattachedtopolloperations*/
structlist_headpwqlist;/*listcontainingpollwaitqueues*/
structeventpoll*ep;/*thecontainerofthisitem*/
structlist_headfllink;/*listheaderusedtolinkthisitemtothestructfileitemslist*/
structepoll_eventevent;/*当前的epitem关系哪些events,这个数据是调用epoll_ctl时从用户态传递过来*/
};
(3)structepoll_filefd{
structfile*file;
intfd;};
(4)structeppoll_entry{/*waitstructureusedbythepollhooks*/
structlist_headllink;/*listheaderusedtolinkthisstructuretothestructepitem*/
structepitem*base;/*thebasepointerissettothecontainerstructepitem*/
wait_queue_twait;/waitqueueitemthatwillbelinkedtothetargetfilewaitqueuehead./
wait_queue_head_t*whead;/thewaitqueueheadthatlinkedthewaitwaitqueueitem*/
};//注:后两项相当于等待队列
(5)structep_pqueue{/*wrapperstructusedbypollqueueing*/
poll_tablept;//structpoll_table是一个函数指针的包裹
structepitem*epi;
};
(6)structep_send_events_data{
/*usedbytheep_send_events()functionascallbackprivatedata*/
intmaxevents;
structepoll_event__user*events;
};
各个数据结构的关系如下图:
2.函数调用分析
epoll函数调用关系全局图:
3.函数实现分析
3.1eventpoll_init
epoll是个module,所以先看看module的入口eventpoll_init
[fs/eventpoll.c-->evetpoll_init()](简化后)
staticint__initeventpoll_init(void)
{
epi_cache=kmem_cache_create(eventpoll_epi,sizeof(structepitem),
0,slab_hwcache_align|epi_slab_debug|slab_panic,null,null);
pwq_cache=kmem_cache_create(eventpoll_pwq,
sizeof(structeppoll_entry),0,epi_slab_debug|slab_panic,null,null);
//注册了一个新的文件系统,叫eventpollfs
error=register_filesystem(&eventpoll_fs_type);
eventpoll_mnt=kern_mount(&eventpoll_fs_type);;
}
很有趣,这个module在初始化时注册了一个新的文件系统,叫eventpollfs(在eventpoll_fs_type结构里),然后挂载此文件系统。另外创建两个内核cache(在内核编程中,如果需要频繁分配小块内存,应该创建kmem_cahe来做“内存池”),分别用于存放structepitem和eppoll_entry。
现在想想epoll_create为什么会返回一个新的fd?因为它就是在这个叫做eventpollfs的文件系统里创建了一个新文件!如下:
3.2sys_epoll_create
[fs/eventpoll.c-->sys_epoll_create()]
asmlinkagelongsys_epoll_create(intsize)
{
interror,fd;
structinode*inode;
structfile*file;
error=ep_getfd(&fd,&inode,&file);
/*setupthefileinternaldatastructure(structeventpoll)*/
error=ep_file_init(file);
}
函数很简单,其中ep_getfd看上去是“get”,其实在第一次调用epoll_create时,它是要创建新inode、新的file、新的fd。而ep_file_init则要创建一个structeventpoll结构,并把它放入file->private_data,注意,这个private_data后面还要用到的。
3.3epoll_ctl
epoll_create好了,该epoll_ctl了,我们略去判断性的代码:
[fs/eventpoll.c-->sys_epoll_ctl()]
asmlinkagelong
sys_epoll_ctl(intepfd,intop,intfd,structepoll_event__user*event)
{
structfile*file,*tfile;
structeventpoll*ep;
structepitem*epi;
structepoll_eventepds;
....
epi=ep_find(ep,tfile,fd);//tfile存放要监听的fd对应在rb-tree中的epitem
switch(op){//省略了判空处理
caseepoll_ctl_add:epds.events|=pollerr|pollhup;
error=ep_insert(ep,&epds,tfile,fd);break;
caseepoll_ctl_del:error=ep_remove(ep,epi);break;
caseepoll_ctl_mod:epds.events|=pollerr|pollhup;
error=ep_modify(ep,epi,&epds);break;
}
原来就是在一个“大的结构”(structeventpoll)里先ep_find,如果找到了structepitem,而根据用户操作是add、del、mod调用相应的函数,这些函数在epitem组成红黑树中增加、删除、修改相应节点(每一个监听fd对应一个节点)。很直白。那这个“大结构”是什么呢?看ep_find的调用方式,ep参数应该是指向这个“大结构”的指针,再看ep=file->private_data,我们才明白,原来这个“大结构”就是那个在epoll_create时创建的structeventpoll,具体再看看ep_find的实现,发现原来是structeventpoll的rbr成员(structrb_root),原来这是一个红黑树的根!而红黑树上挂的都是structepitem。
现在清楚了,一个新创建的epoll文件带有一个structeventpoll结构,这个结构上再挂一个红黑树,而这个红黑树就是每次epoll_ctl时fd存放的地方!
3.4sys_epoll_wait
现在数据结构都已经清楚了,我们来看最核心的:
[fs/eventpoll.c-->sys_epoll_wait()]
asmlinkagelongsys_epoll_wait(intepfd,structepoll_event__user*events,intmaxevents,
inttimeout)
{
structfile*file;
structeventpoll*ep;
/*getthestructfile*fortheeventpollfile*/
file=fget(epfd);
/*
*wehavetocheckthatthefilestructureunderneaththefd
*theuserpassedtous_is_aneventpollfile.(所以如果这里是普通的文件fd会出错)
*/
if(!is_file_epoll(file))
gotoeexit_2;
ep=file->private_data;
error=ep_poll(ep,events,maxevents,timeout);
……
}
故伎重演,从file->private_data中拿到structeventpoll,再调用ep_poll
3.5ep_poll()
[fs/eventpoll.c-->sys_epoll_wait()->ep_poll()]
staticintep_poll(structeventpoll*ep,structepoll_event__user*events,intmaxevents,
longtimeout)
{
intres;
wait_queue_twait;//等待队列项
if(list_empty(&ep->rdllist)){
//ep->rdllist存放的是已就绪(read)的fd,为空时说明当前没有就绪的fd,所以需要将当前
init_waitqueue_entry(&wait,current);//创建一个等待队列项,并使用当前进程(current)初始化
add_wait_queue(&ep->wq,&wait);//将刚创建的等待队列项加入到ep中的等待队列(即将当前进程添加到等待队列)
for(;;){
/*将进程状态设置为task_interruptible,因为我们不希望这期间ep_poll_callback()发信号唤醒进程的时候,进程还在sleep*/
set_current_state(task_interruptible);
if(!list_empty(&ep->rdllist)||!jtimeout)//如果ep->rdllist非空(即有就绪的fd)或时间到则跳出循环
break;
if(signal_pending(current)){
res=-eintr;
break;
}
}
remove_wait_queue(&ep->wq,&wait);//将等待队列项移出等待队列(将当前进程移出)
set_current_state(task_running);
}
....
又是一个大循环,不过这个大循环比poll的那个好,因为仔细一看——它居然除了睡觉和判断ep->rdllist是否为空以外,啥也没做!什么也没做当然效率高了,但到底是谁来让ep->rdllist不为空呢?答案是ep_insert时设下的回调函数.
3.6ep_insert()
[fs/eventpoll.c-->sys_epoll_ctl()-->ep_insert()]
staticintep_insert(structeventpoll*ep,structepoll_event*event,structfile*tfile,intfd)
{
structepitem*epi;
structep_pqueueepq;//创建ep_pqueue对象
epi=epi_mem_alloc();//分配一个epitem
/*初始化这个epitem...*/
epi->ep=ep;//将创建的epitem添加到传进来的structeventpoll
/*后几行是设置epitem的相应字段*/
ep_set_ffd(&epi->ffd,tfile,fd);//将要监听的fd加入到刚创建的epitem
epi->event=*event;
epi->nwait=0;
/*initializethepolltableusingthequeuecallback*/
epq.epi=epi;//将一个epq和新插入的epitem(epi)关联
//下面一句等价于&(epq.pt)->qproc=ep_ptable_queue_proc;
init_poll_funcptr(&epq.pt,ep_ptable_queue_proc);
revents=tfile->f_op->poll(tfile,&epq.pt);//tfile代表targetfile,即被监听的文件,poll()返回就绪事件的掩码,赋给revents.
list_add_tail(&epi->fllink,&tfile->f_ep_links);//每个文件会将所有监听自己的epitem链起来
ep_rbtree_insert(ep,epi);//都搞定后,将epitem插入到对应的eventpoll中去
……
}
紧接着tfile->f_op->poll(tfile,&epq.pt)其实就是调用被监控文件(epoll里叫“targetfile”)的poll方法,而这个poll其实就是调用poll_wait(还记得poll_wait吗?每个支持poll的设备驱动程序都要调用的),最后就是调用ep_ptable_queue_proc。(注:f_op->poll()一般来说只是个wrapper,它会调用真正的poll实现,拿udp的socket来举例,这里就是这样的调用流程:f_op->poll(),sock_poll(),udp_poll(),datagram_poll(),sock_poll_wait()。)这是比较难解的一个调用关系,因为不是语言级的直接调用。ep_insert还把structepitem放到structfile里的f_ep_links连表里,以方便查找,structepitem里的fllink就是担负这个使命的。
3.7ep_ptable_queue_proc
[fs/eventpoll.c-->ep_ptable_queue_proc()]
staticvoidep_ptable_queue_proc(structfile*file,wait_queue_head_t*whead,poll_table*pt)
{
structepitem*epi=ep_item_from_epqueue(pt);
structeppoll_entry*pwq;
if(epi->nwait>=0&&(pwq=pwq_mem_alloc())){
init_waitqueue_func_entry(&pwq->wait,ep_poll_callback);
pwq->whead=whead;
pwq->base=epi;
add_wait_queue(whead,&pwq->wait);
list_add_tail(&pwq->llink,&epi->pwqlist);
epi->nwait++;
}else{
/*wehavetosignalthatanerroroccurred*/
epi->nwait=-1;
}
}
上面的代码就是ep_insert中要做的最重要的事:创建structeppoll_entry,设置其唤醒回调函数为ep_poll_callback,然后加入设备等待队列(注意这里的whead就是上一章所说的每个设备驱动都要带的等待队列)。只有这样,当设备就绪,唤醒等待队列上的等待进程时,ep_poll_callback就会被调用。每次调用poll系统调用,操作系统都要把current(当前进程)挂到fd对应的所有设备的等待队列上,可以想象,fd多到上千的时候,这样“挂”法很费事;而每次调用epoll_wait则没有这么罗嗦,epoll只在epoll_ctl时把current挂一遍(这第一遍是免不了的)并给每个fd一个命令“好了就调回调函数”,如果设备有事件了,通过回调函数,会把fd放入rdllist,而每次调用epoll_wait就只是收集rdllist里的fd就可以了——epoll巧妙的利用回调函数,实现了更高效的事件驱动模型。
现在我们猜也能猜出来ep_poll_callback会干什么了——肯定是把红黑树(ep->rbr)上的收到event的epitem(代表每个fd)插入ep->rdllist中,这样,当epoll_wait返回时,rdllist里就都是就绪的fd了!
3.8ep_poll_callback
[fs/eventpoll.c-->ep_poll_callback()]
staticintep_poll_callback(wait_queue_t*wait,unsignedmode,intsync,void*key)
{
intpwake=0;
structepitem*epi=ep_item_from_wait(wait);
structeventpoll*ep=epi->ep;
/*ifthisfileisalreadyinthereadylistweexitsoon*/
if(ep_is_linked(&epi->rdllink))
gotois_linked;
list_add_tail(&epi->rdllink,&ep->rdllist);
is_linked:
/*
*wakeup(ifactive)boththeeventpollwaitlistandthe->poll()
*waitlist.
*/
if(waitqueue_active(&ep->wq))
wake_up(&ep->wq);
if(waitqueue_active(&ep->poll_wait))
pwake++;
}
4.epoll独有的epollet
epollet是epoll系统调用独有的flag,et就是edgetrigger(边缘触发)的意思,具体含义和应用大家可google之。有了epollet,重复的事件就不会总是出来打扰程序的判断,故而常被使用。那epollet的原理是什么呢?
上篇我们讲到epoll把fd都挂上一个回调函数,当fd对应的设备有消息时,回调函数就把fd放入rdllist链表,这样epoll_wait只要检查这个rdllist链表就可以知道哪些fd有事件了。我们看看ep_poll的最后几行代码:
4.1ep_poll()(接3.5)
[fs/eventpoll.c->ep_poll()]
/*trytotransfereventstouserspace.*/
ep_events_transfer(ep,events,maxevents)
......
把rdllist里的fd拷到用户空间,这个任务是ep_events_transfer做的.
4.2ep_events_transfer
[fs/eventpoll.c->ep_events_transfer()]
staticintep_events_transfer(structeventpoll*ep,structepoll_event__user*events,
intmaxevents)
{
inteventcnt=0;
structlist_headtxlist;
init_list_head(&txlist);
/*collect/extractreadyitems*/
if(ep_collect_ready_items(ep,&txlist,maxevents)>0){
/*buildresultsetinuserspace*/
eventcnt=ep_send_events(ep,&txlist,events);
/*reinjectreadyitemsintothereadylist*/
ep_reinject_items(ep,&txlist);
}
up_read(&ep->sem);
returneventcnt;
}
代码很少,其中ep_collect_ready_items把rdllist里的fd挪到txlist里(挪完后rdllist就空了),接着ep_send_events把txlist里的fd拷给用户空间,然后ep_reinject_items把一部分fd从txlist里“返还”给rdllist以便下次还能从rdllist里发现它。
其中ep_send_events的实现:
4.3ep_send_events()
[fs/eventpoll.c->ep_send_events()]
staticintep_send_events(structeventpoll*ep,structlist_head*txlist,
structepoll_event__user*events)
{
inteventcnt=0;
unsignedintrevents;
structlist_head*lnk;
structepitem*epi;
list_for_each(lnk,txlist){
epi=list_entry(lnk,structepitem,txlink);
revents=epi->ffd.file->f_op->poll(epi->ffd.file,null);//调用每个监听文件的poll方法获取就绪事件(掩码),并赋值给revents
epi->revents=revents&epi->event.events;
if(epi->revents){
if(__put_user(epi->revents,&events[eventcnt].events)||__put_user(epi->event.data,
&events[eventcnt].data))//将event从内核空间发送到用户空间
return-efault;
if(epi->event.events&epolloneshot)
epi->event.events&=ep_private_bits;
eventcnt++;
}}
returneventcnt;}
这个拷贝实现其实没什么可看的,但是请注意红色的一行,这个poll很狡猾,它把第二个参数置为null来调用。我们先看一下设备驱动通常是怎么实现poll的:
staticunsignedintscull_p_poll(structfile*filp,poll_table*wait)
{
structscull_pipe*dev=filp->private_data;
unsignedintmask=0;
poll_wait(filp,&dev->inq,wait);
poll_wait(filp,&dev->outq,wait);
if(dev->rp!=dev->wp)
mask|=pollin|pollrdnorm;/*readable*/
if(spacefree(dev))
mask|=pollout|pollwrnorm;/*writable*/
returnmask;
}
上面这段代码摘自《linux设备驱动程序(第三版)》,绝对经典,设备先要把current(当前进程)挂在inq和outq两个队列上(这个“挂”操作是wait回调函数指针做的),然后等设备来唤醒,唤醒后就能通过mask拿到事件掩码了(注意那个mask参数,它就是负责拿事件掩码的)。那如果wait为null,poll_wait会做些什么呢?
4.4poll_wait
[include/linux/poll.h->poll_wait]
staticinlinevoidpoll_wait(structfile*filp,wait_queue_head_t*wait_address,poll_table*p)
{
if(p&&wait_address)
p->qproc(filp,wait_address,p);
}
喏,看见了,如果poll_table为空,什么也不做。我们倒回ep_send_events,那句标红的poll,实际上就是“我不想休眠,我只想拿到事件掩码”的意思。然后再把拿到的事件掩码拷给用户空间。ep_send_events完成后,就轮到ep_reinject_items了。
4.5p_reinject_items
[fs/eventpoll.c->ep_reinject_items]
staticvoidep_reinject_items(structeventpoll*ep,structlist_head*txlist)
{
intricnt=0,pwake=0;
unsignedlongflags;
structepitem*epi;
while(!list_empty(txlist)){//遍历txlist(此时txlist存放的是已就绪的epitem)
epi=list_entry(txlist->next,structepitem,txlink);
ep_list_del(&epi->txlink);//将当前的epitem从txlist中删除
if(ep_rb_linked(&epi->rbn)&&!(epi->event.events&epollet)&&
(epi->revents&epi->event.events)&&!ep_is_linked(&epi->rdllink)){
list_add_tail(&epi->rdllink,&ep->rdllist);//将当前epitem重新加入ep->rdllist
ricnt++;//ep->rdllist中epitem的个数(即从新加入就绪的epitem的个数)
}
}
if(ricnt){//如果ep->rdllist不空,重新唤醒等、等待队列的进程(current)
if(waitqueue_active(&ep->wq))
wake_up(&ep->wq);
if(waitqueue_active(&ep->poll_wait))
pwake++;
}
……
}
ep_reinject_items把txlist里的一部分fd又放回rdllist,那么,是把哪一部分fd放回去呢?看上面那个判断——是那些“没有标上epollet(即默认的lt)”(标红代码)且“事件被关注”(标蓝代码)的fd被重新放回了rdllist。那么下次epoll_wait当然会又把rdllist里的fd拿来拷给用户了。举个例子。假设一个socket,只是connect,还没有收发数据,那么它的poll事件掩码总是有pollout的(参见上面的驱动示例),每次调用epoll_wait总是返回pollout事件(比较烦),因为它的fd就总是被放回rdllist;假如此时有人往这个socket里写了一大堆数据,造成socket塞住(不可写了),那么标蓝色的判断就不成立了(没有pollout了),fd不会放回rdllist,epoll_wait将不会再返回用户pollout事件。现在我们给这个socket加上epollet,然后connect,没有收发数据,此时,标红的判断又不成立了,所以epoll_wait只会返回一次pollout通知给用户(因为此fd不会再回到rdllist了),接下来的epoll_wait都不会有任何事件通知了。
总结:
epoll函数调用关系全局图:
注:上述函数关系图中有个问题,当ep_reinject_items()将lt的上次就绪的eptiem重新放回就绪链表,下次ep_poll()直接返回,这不就造成了一个循环了吗?什么时候这些lt的epitem才不再加入就绪链表呢?这个问题的解决在4.3——ep_send_events()中,注意这个函数中标红的那个poll调用,我们分析过当传入null时,poll仅仅是拿到事件掩码,所以如果之前用户对事件的处理导致的文件的revents(状态)改变,那么这里就会得到更新。例如:用户以可读监听,当读完数据后文件的会变为不可读,这时ep_send_events()中获取的revents中将不再有可读事件,也就不满足ep_reinject_items()中的蓝色判断,所以epitem不再被加入就绪链表(ep->rdllist)。但是如果只读部分数据,并不会引起文件状态改变(文件仍可读),所以仍会加入就绪链表通知用户空间,这也就是如果是tl,就会一直通知用户读事件,直到某些操作导致那个文件描述符不再为就绪状态了(比如,你在发送,接收或者接收请求,或者发送接收的数据少于一定量时导致了一个ewouldblock错误)。
将上述调用添加到函数调用关系图后,如下(添加的为蓝线):
epoll实现数据结构全局关系图:
利用人工智能解决企业的网络安全问题
PMOS管放倒灌电路讲解(电路图+工作原理)
高速PCB设计规则总结及原因分析(图文)
农药检测仪器的应用可有效提升粮食作物产品的质量
中兴通讯完成首个虚拟化DPI验证 商汤“工业引擎”加码京沪高铁巡检
poll&&epoll之epoll实现
一文详解非正弦周期波
无线运营商被质疑,“载波恶化”与载波聚合问题凸显
浅谈激光焊接设备在新能源汽车领域中的应用
2017年中国手机AP销售增长 联发科首季发力市占回升
基于FPGA的数字波束形成技术的工程实现
51单片机常见的7种时钟电路介绍
英特尔公布1万亿参数AI模型,比ChatGPT大570%
基于Z1510与A/D转换单元的音视频压缩系统的设计
FPGA对芯片有何影响 FPGA如何实现连接?
我国5G毫米波技术已驶入商用冲刺阶段
华为起诉联邦通信 旗舰手机5G基站已不包含任何美国制造零件
为什么PCB要使用高Tg材料
B4系列电化学传感器对气体氮氧化物中的检测
矿井人员安全虹膜识别系统的功能和应用设计