源码位置:anet.c/anet.h/networking.c
1. 简介
Redis在anet.c
中对TCP/IP网络中socket api接口和状态设置进行了封装。状态设置主要包括socket连接的阻塞性、tcp的保活定时器的设置、设置发送缓冲区、tcp的nagle算法设置、设置发送/接收超时时间、地址重用的设置和IPv6/IPv4的设置等。
Redis网络通讯的具体实现在networking.c
中,主要包括如何建立和客户端的连接,并且接收其命令,返回给客户端。
2. 回顾tcp socket编程
2.1 TCP客户/服务器程序socket编程流程如下:
2.2 TCP的三次握手
2.3 TCP的四次挥手
TCP的断开连接操作可由任意一端发起
3. anet解析
anet.h中定义的函数如下:
1 | int anetTcpConnect(char *err, const char *addr, int port); // tcp连接 |
Redis服务启动后会调用server.c/listenToPort()
进行socket相关的设置和端口监听。如果服务器配置不包含要绑定的特定地址,则该函数会尝试IPv6(调用anetTcp6Server()
)和IPv4(调用anetTcpServer()
)协议进行绑定。
不管是使用IPv6还是IPv4协议,最终调用的都是_anetTcpServer()
来创建socket并进行绑定监听,以下是该函数的实现代码:
1 | static int _anetTcpServer(char *err, int port, char *bindaddr, int af, int backlog) |
4. networking解析
新版本Redis增加了多线程I/O来改进读写缓冲区的性能,而不是改进命令执行的性能主要原因是:
- 读写缓冲区在命令执行的生命周期中是占了比较大的比重
- Redis更倾向于保持简单的设计,如果在命令执行部分改用多线程会不得不处理各种问题,例如并发写入、加锁等
那么将读写缓冲区改为多线程后整个模型大致如下:
4.1 线程初始化
首先,如果用户没有开启多线程IO,也就是io_threads_num == 1时直接按照单线程模型处理,如果超过线程数IO_THREADS_MAX_NUM上限则异常退出。
紧接着Redis使用listCreate()创建io_threads_num个线程,并且对主线程(id=0)以外的线程进行处理:
- 初始化线程的等待任务数为0
- 获取锁,使得线程不能进行操作
- 将线程tid与Redis中的线程id(for循环生成)进行映射
1 | void initThreadedIO(void) { |
4.2 建立连接
Redis服务端会初始化一个socket端口来监听客户端的连接,当一个连接建立后,服务端会对客户端的socket进行设置:
- 客户端socket设置为非阻塞模式,因为Redis采用的是非阻塞I/O多路复用模型。
- 客户端socket设置为 TCP_NODELAY 属性,禁用 Nagle 算法。
- 将该socket绑定读事件到时间loop,用于监听这个客户端socket的数据发送。
- 建立连接后如果发现已经超过最大连接数,则关闭连接,删除该客户端socket。
1 | client *createClient(connection *conn) { |
4.3 读事件到来
读事件到来,Redis需要判断是否满足Threaded IO条件。如果符合,则将client放到等待读取的队列中,并将client的flag设置为等待读取;如果不符合,则按照单线程模型往下继续处理。
等待读取队列由server维护,包含了所有处于读事件pending的客户端列表。
1 | void readQueryFromClient(connection *conn) { |
如果开启了Threaded-IO,如何分配读取pending的client给thread呢?handleClientsWithPendingReadsUsingThreads()
1 | int handleClientsWithPendingReadsUsingThreads(void) { |
如何处理读请求?
在上面的过程中,当任务分发完毕后,每个线程按照正常流程将自己负责的Client的读取缓冲区的内容进行处理,和原来的单线程没有太大差异。
每轮处理中,需要将各个线程的锁开启,打开标志:
1 | void startThreadedIO(void) { |
同样结束时,首先需要检查是否有剩余待读的IO,如果没有,将线程锁定,标志关闭:
1 | void stopThreadedIO(void) { |
4.3 写入返回缓冲区
众多的addReply*()
方法最终会调用_addReplyToBuffer()
函数在缓冲区中添加回复数据。
1 | int _addReplyToBuffer(client *c, const char *s, size_t len) { |
如果调用_addReplyToBuffer()
函数失败,将会调用另一个函数_addReplyProtoToList()
1 | void _addReplyProtoToList(client *c, const char *s, size_t len) { |