源码位置:ae.c/ae.h/ae_evport.c/ae_epoll.c/ae_kqueue.c/ae_select.c
1. 简介 Redis 是一个事件驱动的内存数据库,服务器需要处理两种类型的事件:
文本(IO)事件AE_FILE_EVENTS
时间事件AE_TIME_EVENTS
2. 文本(IO)事件 Redis是基于 Reactor 模式开发了自己的网络事件处理器,这个处理器被称为文件事件处理器。
文件事件处理器使用 I/O 多路复用(multiplexing)模型来同时监听多个套接字, 并根据套接字目前执行的任务来为套接字关联不同的事件处理器。
当被监听的套接字准备好执行连接应答(accept)、读取(read)、写入(write)、关闭(close)等操作时, 与操作相对应的文件事件就会产生, 这时文件事件处理器就会调用套接字之前关联好的事件处理器来处理这些事件。
虽然文件事件处理器以单线程方式运行, 但通过使用 I/O 多路复用模型来监听多个套接字, 文件事件处理器既实现了高性能的网络通信模型, 又可以很好地与 Redis 服务器中其他同样以单线程方式运行的模块进行对接, 这保持了 Redis 内部单线程设计的简单性。 优势:
完全基于内存,绝大部分请求是纯粹的内存操作,非常快速
数据结构简单,对数据操作也简单,Redis中的数据结构是专门进行设计的
采用单线程,避免了不必要的上下文切换和竞争条件,也不存在多进程或者多线程导致的切换而消耗 CPU,不用去考虑各种锁的问题,不存在加锁释放锁操作。
使用多路I/O复用模型,非阻塞IO。
Redis提供了4中 I/O 多路复用的方式,其性能由高到低依次是:evport
-> epoll
-> kqueue
-> select
。 不同的平台使用了不同的实现方式,比如 epoll 和 select 可以用于Linux平台,kqueue 用于MacOS平台,select 用于Windows平台,evport 用于Solaris平台。 代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 #ifdef HAVE_EVPORT #include "ae_evport.c" #else #ifdef HAVE_EPOLL #include "ae_epoll.c" #else #ifdef HAVE_KQUEUE #include "ae_kqueue.c" #else #include "ae_select.c" #endif #endif #endif
一般情况下,Redis服务端都是部署在Linux系统上的,所以本文内容就只解析Redis是怎么利用epoll实现 I/O 多路复用的吧。
epoll: epoll相关的接口实现都封装在了ae_epoll.c中,主要提供了以下几个函数:
1 2 3 4 5 6 7 static int aeApiCreate (aeEventLoop *eventLoop) ; static int aeApiResize (aeEventLoop *eventLoop, int setsize) ; static void aeApiFree (aeEventLoop *eventLoop) ; static int aeApiAddEvent (aeEventLoop *eventLoop, int fd, int mask) ; static void aeApiDelEvent (aeEventLoop *eventLoop, int fd, int delmask) ; static int aeApiPoll (aeEventLoop *eventLoop, struct timeval *tvp) ; static char *aeApiName (void ) { return "epoll" ; }
在redis.h/redisServer 结构中,保存着基于事件驱动的程序状态:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 struct redisServer { aeEventLoop *el; } typedef struct aeEventLoop { int maxfd; int setsize; long long timeEventNextId; time_t lastTime; aeFileEvent *events; aeFiredEvent *fired; aeTimeEvent *timeEventHead; int stop; void *apidata; aeBeforeSleepProc *beforesleep; aeBeforeSleepProc *aftersleep; int flags; } aeEventLoop;
Redis服务器在启动时,首先会创建事件轮询器:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 aeEventLoop *aeCreateEventLoop (int setsize) { aeEventLoop *eventLoop; int i; if ((eventLoop = zmalloc(sizeof (*eventLoop))) == NULL ) goto err; eventLoop->events = zmalloc(sizeof (aeFileEvent)*setsize); eventLoop->fired = zmalloc(sizeof (aeFiredEvent)*setsize); if (eventLoop->events == NULL || eventLoop->fired == NULL ) goto err; eventLoop->setsize = setsize; eventLoop->lastTime = time(NULL ); eventLoop->timeEventHead = NULL ; eventLoop->timeEventNextId = 0 ; eventLoop->stop = 0 ; eventLoop->maxfd = -1 ; eventLoop->beforesleep = NULL ; eventLoop->aftersleep = NULL ; eventLoop->flags = 0 ; if (aeApiCreate(eventLoop) == -1 ) goto err; for (i = 0 ; i < setsize; i++) eventLoop->events[i].mask = AE_NONE; return eventLoop; err: if (eventLoop) { zfree(eventLoop->events); zfree(eventLoop->fired); zfree(eventLoop); } return NULL ; }
创建完成后,开始事件轮询主循环:
1 2 3 4 5 6 7 8 void aeMain (aeEventLoop *eventLoop) { eventLoop->stop = 0 ; while (!eventLoop->stop) { if (eventLoop->beforesleep != NULL ) eventLoop->beforesleep(eventLoop); aeProcessEvents(eventLoop, AE_ALL_EVENTS|AE_CALL_AFTER_SLEEP); } }
事件的调度和执行函数:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 int aeProcessEvents (aeEventLoop *eventLoop, int flags) { int processed = 0 , numevents; if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0 ; if (eventLoop->maxfd != -1 || ((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) { int j; aeTimeEvent *shortest = NULL ; struct timeval tv , *tvp ; if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT)) shortest = aeSearchNearestTimer(eventLoop); if (shortest) { long now_sec, now_ms; aeGetTime(&now_sec, &now_ms); tvp = &tv; long long ms = (shortest->when_sec - now_sec)*1000 + shortest->when_ms - now_ms; if (ms > 0 ) { tvp->tv_sec = ms/1000 ; tvp->tv_usec = (ms % 1000 )*1000 ; } else { tvp->tv_sec = 0 ; tvp->tv_usec = 0 ; } } else { if (flags & AE_DONT_WAIT) { tv.tv_sec = tv.tv_usec = 0 ; tvp = &tv; } else { tvp = NULL ; } } if (eventLoop->flags & AE_DONT_WAIT) { tv.tv_sec = tv.tv_usec = 0 ; tvp = &tv; } numevents = aeApiPoll(eventLoop, tvp); if (eventLoop->aftersleep != NULL && flags & AE_CALL_AFTER_SLEEP) eventLoop->aftersleep(eventLoop); for (j = 0 ; j < numevents; j++) { aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd]; int mask = eventLoop->fired[j].mask; int fd = eventLoop->fired[j].fd; int fired = 0 ; int invert = fe->mask & AE_BARRIER; if (!invert && fe->mask & mask & AE_READABLE) { fe->rfileProc(eventLoop,fd,fe->clientData,mask); fired++; } if (fe->mask & mask & AE_WRITABLE) { if (!fired || fe->wfileProc != fe->rfileProc) { fe->wfileProc(eventLoop,fd,fe->clientData,mask); fired++; } } if (invert && fe->mask & mask & AE_READABLE) { if (!fired || fe->wfileProc != fe->rfileProc) { fe->rfileProc(eventLoop,fd,fe->clientData,mask); fired++; } } processed++; } } if (flags & AE_TIME_EVENTS) processed += processTimeEvents(eventLoop); return processed; }
3. 时间事件 Redis有很多操作需要在给定的时间点进行处理,时间事件就是对这类定时任务的抽象。 先看看时间事件的数据结构:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 typedef struct aeTimeEvent { long long id; long when_sec; long when_ms; aeTimeProc *timeProc; aeEventFinalizerProc *finalizerProc; void *clientData; struct aeTimeEvent *prev ; struct aeTimeEvent *next ; } aeTimeEvent;
处理时间事件的函数:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 static int processTimeEvents (aeEventLoop *eventLoop) { int processed = 0 ; aeTimeEvent *te; long long maxId; time_t now = time(NULL ); if (now < eventLoop->lastTime) { te = eventLoop->timeEventHead; while (te) { te->when_sec = 0 ; te = te->next; } } eventLoop->lastTime = now; te = eventLoop->timeEventHead; maxId = eventLoop->timeEventNextId-1 ; while (te) { long now_sec, now_ms; long long id; if (te->id == AE_DELETED_EVENT_ID) { aeTimeEvent *next = te->next; if (te->prev) te->prev->next = te->next; else eventLoop->timeEventHead = te->next; if (te->next) te->next->prev = te->prev; if (te->finalizerProc) te->finalizerProc(eventLoop, te->clientData); zfree(te); te = next; continue ; } if (te->id > maxId) { te = te->next; continue ; } aeGetTime(&now_sec, &now_ms); if (now_sec > te->when_sec || (now_sec == te->when_sec && now_ms >= te->when_ms)) { int retval; id = te->id; retval = te->timeProc(eventLoop, id, te->clientData); processed++; if (retval != AE_NOMORE) { aeAddMillisecondsToNow(retval,&te->when_sec,&te->when_ms); } else { te->id = AE_DELETED_EVENT_ID; } } te = te->next; } return processed; }
4. 总结 为什么单线程的Redis会这么快?
Redis是完全基于内存的数据库,绝大部分请求是纯粹的内存操作,非常快速。
Redis由C语言编写,数据结构简单,对数据的操作也简单,Redis中的数据结构是专门设计的。
Redis采用单线程,保证了数据操作的原子性,不存在多进程或者多线程导致切换而消耗CPU,避免了不必要的上下文切换和竞争条件,不存在加/解锁的操作,不用考虑可能出现的死锁导致性能消耗。
使用 I/O 多路复用模型,非阻塞IO,可以处理并发的连接。