admin

Redis源码学习之事件驱动

admin 运维技术 2022-11-19 440浏览 0

Redis基于多路复用技术实现了一套简单的事件驱动库,代码在ae.h、ae.c以及ae_epoll.c、ae_evport.c和ae_kqueue.c、ae_select.c这几个文件中。其中ae表示的是antirez eventloop的意思。

Redis里面包含两种事件类型:FileEvent和TimeEvent。

Redis采用IO多路复用技术,所有的事件都是在一个线程中进行处理。Redis的事件驱动模型可以以以下为代码进行表示:

intmain(intargc,char**argv)

{

while(true){

//等待事件到来:wait4Event();

//处理事件:processEvent()

}

}

在一个死循环中等待事件的到来,然后对事件进行处理,以此往复。这就是一个最经典的网络编程模型。

1.基本数据结构

aeEventLoop

Redis源码学习之事件驱动

aeEventLoop是Redis中事件驱动模型的核心,封装了整个事件循环,其中每个字段解释如下:

  • maxfd:已经接受的最大的文件描述符。
  • setsize:当前循环中所能容纳的文件描述符的数量。
  • timeEventNextId:下一个时间事件的ID.
  • lastTime:上一次被访问的时间,用来检测系统时钟是否被修改。
  • events:指针,指向保存所有注册的事件的数组首地址。
  • fired:指针,保存所有已经买被触发的事件的数组首地址。
  • timeEventHead:Redis用一个链表来存储所有的时间事件,timeEventHead是指向这个链表的首节点指针。
  • stop:停止整个事件循环。
  • apiData:指针,指向epoll结构。
  • beforeSleep:函数指针。每次实现循环的时候,在阻塞直到时间到来之前,会先调用这个函数。

aeFileEvent和aeTimeEvent

这两个结构分别表示文件事件和时间事件,定义如下

typedefstructaeFileEvent{

intmask;/*oneofAE_(READABLE|WRITABLE)*/

aeFileProc*rfileProc;//函数指针,写事件处理

aeFileProc*wfileProc;//函数指针,读事件处理

void*clientData;//具体的数据

}aeFileEvent;

其中mask表示文件事件类型掩码,可以是AE_READABLE表示是可读事件,AE_WRITABLE为可写事件。aeFileProc是函数指针。

/*Timeeventstructure*/

typedefstructaeTimeEvent{

longlongid;//事件ID

longwhen_sec;//事件触发的时间:s

longwhen_ms;//事件触发的时间:ms

aeTimeProc*timeProc;//函数指针

aeEventFinalizerProc*finalizerProc;//函数指针:在对应的aeTieEvent节点被删除前调用,可以理解为aeTimeEvent的析构函数

void*clientData;//指针,指向具体的数据

structaeTimeEvent*next;//指向下一个时间事件指针

}aeTimeEvent;

aeFiredEvent

aeFiredEvent结构表示一个已经被触发的事件,结果如下:

/*Afiredevent*/

typedefstructaeFiredEvent{

intfd;//事件被触发的文件描述符

intmask;//被触发事件的掩码,表示被触发事件的类型

}aeFiredEvent;

fd表示事件发生在哪个文件描述符上面,mask用来表示具体事件的类型。

aeApiState

Redis底层采用IO多路复用技术实现高并发,具体实现可以采用kqueue、select、epoll等技术。对于Linux来说,epoll的性能要优于select,所以以epoll为例来进行分析。

typedefstructaeApiState{

intepfd;

structepoll_event*events;

}aeApiState;

aeApiState封装了跟epoll相关的数据,epfd保存epoll_create()返回的文件描述符。

具体实现细节

事件循环启动:aeMain()

事件驱动的启动代码位于ae.c的aeMain()函数中,代码如下:

Redis源码学习之事件驱动

从aeMain()方法中可以看到,整个事件驱动是在一个while()循环中不停地执行aeProcessEvents()方法,在这个方法中执行从客户端发送过来的请求。

初始化:aeCreateEventLoop()

aeEventLoop的初始化是在aeCreateEventLoop()方法中进行的,这个方法是在server.c中的initServer()中调用的。实现如下:

Redis源码学习之事件驱动

在这个方法中主要就是给aeEventLoop对象分配内存然后并进行初始化。其中关键的地方有:

1、调用aeApiCreate()初始化epoll相关的数据。aeApiCreate()实现如下:

Redis源码学习之事件驱动

在aeApiCreate()方法中主要完成以下三件事:

  1. 分配aeApiState结构需要的内存。
  2. 调用epoll_create()方法生成epoll的文件描述符,并保存在aeApiState.epfd字段中。
  3. 把第一步分配的aeApiState的内存地址保存在EventLoop->apidata字段中。

    2、初始化events中的mask字段为为AE_NONE。

    生成fileEvent:aeCreateFileEvent()

    Redis使用aeCreateFileEvent()来生成fileEvent,代码如下:

    Redis源码学习之事件驱动

    aeCreateFileEvent()方法主要做了以下三件事:

    1. 检查新增的fd是否超过所能容纳最大值。
    2. 调用aeApiAddEvent()方法把对应的fd以mask模式添加到epoll监听器中。
    3. 设置相应的字段值。

      其中最关键的步骤是第二步,aeApiAddEvent()方法如下:

      Redis源码学习之事件驱动

      生成timeEvent:aeCreateTimeEvent()

      aeCreateTimeEvent()方法主要是用来生成timeEvent节点,其实现比较简单,代码如下所示:

      Redis源码学习之事件驱动

      处理timeEevnt:processTimeEvents()

      Redis在processTimeEvents()方法中来处理所有的timeEvent,实现如下:

      staticintprocessTimeEvents(aeEventLoop*eventLoop){
      
      intprocessed=0;
      
      aeTimeEvent*te,*prev;
      
      longlongmaxId;
      
      time_tnow=time(NULL);
      
      /**
      
      *如果系统时间被调整到将来某段时间然后又被设置回正确的时间,
      
      *这种情况下链表中的timeEvent有可能会被随机的延迟执行,因
      
      *此在这个情况下把所有的timeEvent的触发时间设置为0表示及执行
      
      */
      
      if(now<eventLoop->lastTime){
      
      te=eventLoop->timeEventHead;
      
      while(te){
      
      te->when_sec=0;
      
      te=te->next;
      
      }
      
      }
      
      eventLoop->lastTime=now;//设置上次运行时间为now
      
      
      
      prev=NULL;
      
      te=eventLoop->timeEventHead;
      
      maxId=eventLoop->timeEventNextId-1;
      
      while(te){
      
      longnow_sec,now_ms;
      
      longlongid;
      
      /**
      
      *删除已经被标志位删除的时间事件
      
      */
      
      if(te->id==AE_DELETED_EVENT_ID){
      
      aeTimeEvent*next=te->next;
      
      if(prev==NULL)
      
      eventLoop->timeEventHead=te->next;
      
      else
      
      prev->next=te->next;
      
      if(te->finalizerProc)
      
      //在时间事件节点被删除前调用finlizerProce()方法
      
      te->finalizerProc(eventLoop,te->clientData);
      
      zfree(te);
      
      te=next;
      
      continue;
      
      }
      
      if(te->id>maxId){
      
      /**
      
      *te->id>maxId表示当前te指向的timeEvent为当前循环中新添加的,
      
      *对于新添加的节点在本次循环中不作处理。
      
      *PS:为什么会出现这种情况呢?有可能是在timeProc()里面会注册新的timeEvent节点?
      
      *对于当前的Redis版本中不会出现te->id>maxId这种情况
      
      */
      
      te=te->next;
      
      continue;
      
      }
      
      aeGetTime(&now_sec,&now_ms);
      
      if(now_sec>te->when_sec||
      
      (now_sec==te->when_sec&&now_ms>=te->when_ms))
      
      {
      
      //如果当前时间已经超过了对应的timeEvent节点设置的触发时间,
      
      //则调用timeProc()方法执行对应的任务
      
      intretval;
      
      
      
      id=te->id;
      
      retval=te->timeProc(eventLoop,id,te->clientData);
      
      processed++;
      
      if(retval!=AE_NOMORE){
      
      //要执行多次,则计算下次执行时间
      
      aeAddMillisecondsToNow(retval,&te->when_sec,&te->when_ms);
      
      }else{
      
      //如果只需要执行一次,则把id设置为-1,再下次循环中删除
      
      te->id=AE_DELETED_EVENT_ID;
      
      }
      
      }
      
      prev=te;
      
      te=te->next;
      
      }
      
      returnprocessed;
      
      }
      

      在这个方法中会

      1. 判断系统时间有没有调整过,如果调整过,则会把timeEvent链表中的所有的timeEvent的触发时间设置为0,表示立即执行。
      2. 对timeEvent链表进行遍历,对于每个timeEvent节点,如果有:
        • 返回为AE_NOMORE,表示当前timeEvent节点属于一次性事件,标记该节点ID为AE_DELETED_EVENT_ID,表示删除节点,该节点将会在下一轮的循环中被删除。
        • 返回不是AE_NOMORE,表示当前timeEvent节点属于周期性事件,需要多次执行,调用aeAddMillisecondsToNow()方法设置下次被执行时间。
        • 如果已经被标记为删除(AE_DELETED_EVENT_ID),则立即释放对应节点内存,遍历下个节点。
          • 如果id大于maxId,则表示当前节点为本次循环中新增节点,咋本次循环中不错处理,继续下个节点。
          • 如果当前节点的触发时间大于当前时间,则调用对应节点的timeProc()方法执行任务。根据timeProc()方法的返回,又分为两种情况:

        处理所有事件:aeProcessEvents()

        Redis中所有的事件,包括timeEvent和fileEvent都是在aeProcessEvents()方法中进行处理的,刚方法实现如下:

        /*Processeverypendingtimeevent,theneverypendingfileevent
        
        *(thatmayberegisteredbytimeeventcallbacksjustprocessed).
        
        *Withoutspecialflagsthefunctionsleepsuntilsomefileevent
        
        *fires,orwhenthenexttimeeventoccurs(ifany).
        
        *
        
        *Ifflagsis0,thefunctiondoesnothingandreturns.
        
        *ifflagshasAE_ALL_EVENTSset,allthekindofeventsareprocessed.
        
        *ifflagshasAE_FILE_EVENTSset,fileeventsareprocessed.
        
        *ifflagshasAE_TIME_EVENTSset,timeeventsareprocessed.
        
        *ifflagshasAE_DONT_WAITsetthefunctionreturnsASAPuntilall
        
        *theeventsthat'spossibletoprocesswithouttowaitareprocessed.
        
        *
        
        *Thefunctionreturnsthenumberofeventsprocessed.*/
        
        intaeProcessEvents(aeEventLoop*eventLoop,intflags)
        
        {
        
        intprocessed=0,numevents;
        
        /**
        
        *既没有时间事件也没有文件事件,则直接返回
        
        */
        
        if(!(flags&AE_TIME_EVENTS)&&!(flags&AE_FILE_EVENTS))return0;
        
        /**
        
        *-1==eventloop->maxfd表示还么有任何aeFileEvent被添加到epoll
        
        *事件循环中进行监听
        
        */
        
        if(eventLoop->maxfd!=-1||
        
        ((flags&AE_TIME_EVENTS)&&!(flags&AE_DONT_WAIT))){
        
        intj;
        
        aeTimeEvent*shortest=NULL;
        
        structtimevaltv,*tvp;
        
        
        
        /**
        
        *如果有aeFileEvent需要处理,就先要从所有待处理的
        
        *aeTimeEvent事件中找到最近的将要被执行的aeTimeEvent节点
        
        *并结算该节点触发时间
        
        */
        
        if(flags&AE_TIME_EVENTS&&!(flags&AE_DONT_WAIT))
        
        shortest=aeSearchNearestTimer(eventLoop);
        
        if(shortest){
        
        longnow_sec,now_ms;
        
        
        
        aeGetTime(&now_sec,&now_ms);
        
        tvp=&tv;
        
        
        
        /*Howmanymillisecondsweneedtowaitforthenext
        
        *timeeventtofire?*/
        
        //计算epoll_wait()需要等待的时间
        
        longlongms=
        
        (shortest->when_sec-now_sec)*1000+
        
        shortest->when_ms-now_ms;
        
        
        
        if(ms>0){
        
        tvp->tv_sec=ms/1000;
        
        tvp->tv_usec=(ms%1000)*1000;
        
        }else{
        
        tvp->tv_sec=0;
        
        tvp->tv_usec=0;
        
        }
        
        }else{
        
        //如果flags设置了AE_DONT_WAIT,则设置epoll_wait()等待时间为0,
        
        //即立刻从epoll中返回
        
        if(flags&AE_DONT_WAIT){
        
        tv.tv_sec=tv.tv_usec=0;
        
        tvp=&tv;
        
        }else{
        
        /*Otherwisewecanblock*/
        
        tvp=NULL;/*waitforever*/
        
        }
        
        }
        
        
        
        //调用aeApiPoll()进行阻塞等待事件的到来,等待时间为tvp
        
        numevents=aeApiPoll(eventLoop,tvp);
        
        for(j=0;j<numevents;j++){
        
        aeFileEvent*fe=&eventLoop->events[eventLoop->fired[j].fd];
        
        intmask=eventLoop->fired[j].mask;
        
        intfd=eventLoop->fired[j].fd;
        
        intrfired=0;
        
        /*notethefe->mask&mask&...code:maybeanalreadyprocessed
        
        *eventremovedanelementthatfiredandwestilldidn't
        
        *processed,sowecheckiftheeventisstillvalid.*/
        
        //fe->mask&&mask的目的是确保对应事件时候还有效
        
        if(fe->mask&mask&AE_READABLE){
        
        rfired=1;
        
        fe->rfileProc(eventLoop,fd,fe->clientData,mask);
        
        }
        
        if(fe->mask&mask&AE_WRITABLE){
        
        if(!rfired||fe->wfileProc!=fe->rfileProc)
        
        fe->wfileProc(eventLoop,fd,fe->clientData,mask);
        
        }
        
        processed++;
        
        }
        
        }
        
        /*Checktimeevents*/
        
        if(flags&AE_TIME_EVENTS)
        
        //处理aeTimeEvent
        
        processed+=processTimeEvents(eventLoop);
        
        
        
        returnprocessed;/*returnthenumberofprocessedfile/timeevents*/
        
        }
        

        该方法的入参flag表示要处理哪些事件,可以取以下几个值 :

        • AE_ALL_EVENTS:timeEvent和fileEvent都会处理。
        • AE_FILE_EVENTS:只处理fileEvent。
        • AE_TIME_EVENTS:只处理timeEvent。
        • AE_DONT_WAIT:要么立马返回,要么处理完那些不需要等待的事件之后再立马返回。

        aeProcessEvents()方法会做下面几件事:

        1. 判断传入的flag的值,如果既不包含AE_TIME_EVENTS也不包含AE_FILE_EVENTS则直接返回。
        2. 计算如果有aeFileEvent事件需要进行处理,则先计算epoll_wait()方法需要阻塞等待的时间,计算方式如下:
          • 先从aeTimeEvent事件链表中找到最近的需要被触发的aeTimeEvent节点并计算需要被触发的时间,该被触发时间则为epoll_wait()需要等待的时间。
          • 如果没有找到最近的aeTimeEvent节点,表示没有aeTimeEvent节点被加入链表,则判断传入的flags是否包含AE_DONT_WAIT选项,则设置epoll_wait()需要等待时间为0,表示立即返回。
          • 如果没有设置AE_DONT_WAIT,则设置需要等待时间为NULL,表示epoll_wait()一直阻塞等待知道有fileEvent事件到来。
        3. 调用aeApiPoll()方法阻塞等待事件的到来,阻塞时间为第二步中计算的时间。aeApiPoll()实现见文末:
          • aeApiPoll()会做下面几件事:
            • 根据传入的tvp计算需要阻塞的时间,然后调用epoll_wait()进行阻塞等待。
            • 有事件到来之后先计算对应事件的类型。
            • 把事件发生的fd以及对应的类型mask拷贝到fired数组中。
        4. 从aeApiPoll()方法返回之后,所有事件已经就绪了的fd以及对应事件的类型mask已经保存在eventLoop->fired[]数组中。依次遍历fired数组,根据mask类型,执行对应的frileProc()或者wfileProce()方法。
        5. 如果传入的flags中有AE_TIME_EVENTS,则调用processTimeEvents()执行所有已经到时间了的timeEvent。

          Redis源码学习之事件驱动

          本系列

          • Redis 源码学习之 Redis 事务

继续浏览有关 Redis 的文章
发表评论