源码位置:pubsub.c/redis.h
1. 前言  Redis发布订阅(pub/sub)是一种消息通信模式,由三部分组成:发布者(pub),频道(channel),订阅者(sub)。具体结构如下:
 
 发布者和订阅者都是Redis客户端,频道是Redis服务端,发布者将消息发布到某一频道上,订阅了这一频道的订阅者就会收到该条信息。Redis客户端可订阅任意数量的频道。 Redis的发布订阅功能并不保证可靠,因为所有数据都存在内存中,没有提供持久化的功能,也不记录消费端状态,所以相对市面上的一些消息队列相比(如kafka、rabittMQ等),可靠性会差很多。在Redis5.0版本的stream消息队列功能发布之前,会有使用者使用redis-list来实现消息队列和发布订阅的功能,虽然有持久化(AOF & RDB)的功能,但是实现起来比较笨拙,不够方便。
pubsub与stream比较:
pub/sub 
stream 
 
 
不能持久化消息 
可以持久化,支持RDB和AOF两种持久化机制 
 
没有消息队列中群组的概念 
引入了消费组的概念 
 
redis客户端断线重连会丢失中间的数据 
支持position,能够消费历史消息。断线后支持消息继续从上次的时间点读取,不会丢失消息,也可以直接读取最新消息 
 
redis断线后需要重新订阅 
 
不存在这个问题 
 
没有ack机制 
有ACK机制,能够一定程度保证消息“at least once” 消费 
 
 
 
基于stream消息队列的多种好处,pub/sub功能仅做源码学习,实际项目中推荐使用stream或者kafka等消息队列。
2. 命令介绍  下面介绍命令的使用详情:
PSUBSCRIBE pattern [pattern1 ….]
说明:订阅一个或多个符合给定模式的频道,每个模式以*作为匹配符 
参数:pattern(给定的模式) 
返回:接受到的信息 
 
 
PUNSUBSCRIBE pattern [pattern1 ….]
说明:用于退订所有给定模式的频道 
参数:pattern(给定的模式) 
返回:这个命令在不同的客户端中有不同的表现。 
 
 
SUBSCRIBE channel [channel1 …]
说明:用于订阅给定的一个或多个频道的信息 
参数:channel(给定的频道名) 
返回:接收到的信息 
 
 
UNSUBSCRIBE channel [channel1 …]
说明:用于退订给定的一个或多个频道的信息 
参数:channel(给定的频道名) 
返回:这个命令在不同的客户端中有不同的表现 
 
 
PUBLISH channel message
说明:用于将信息发送到指定的频道 
参数:channel(频道名称),message(将要发送的信息) 
返回:接收到此消息的订阅者数量 
 
 
PUBSUB < subcommand > argument [argument1 ….]
说明:用于查看订阅与发布系统状态,它由数个不同格式的子命令组成 
参数:subcommand(子命令),argument(子命令参数) 
返回:由活跃频道组成的列表 
 
 
 
子命令如下:
subcommand 
argument 
说明 
 
 
CHANNELS 
[pattern] 
返回指定模式pattern的活跃的频道,指定返回由SUBSCRIBE订阅的频道 
 
NUMSUB 
channel channel2 … 
返回指定频道的订阅数量 
 
NUMPAT 
 
返回订阅模式的数量,注意:这个命令返回的不是订阅模式的客户端的数量, 而是客户端订阅的所有模式的数量总和 
 
 
 
3. 实现原理  每个 Redis 服务器进程维持着一个标识服务器状态 的 redis.h/redisServer 结构,其中就 保存着有订阅的频道 以及 订阅模式 的信息:
1 2 3 4 5 6 7 8 9 10 11 12 13 struct  redisServer  {              dict *pubsub_channels;       list  *pubsub_patterns;        }; typedef  struct  client  {         dict *pubsub_channels;       list  *pubsub_patterns;        } 
 
1.在Redis服务端内部维护了一个 pubsub_channels 的channel列表,记录了此客户端所订阅的频道。
2.当客户端订阅某一个频道之后,Redis服务端就会往自身的 pubsub_channels 这个字典变量中新添加一条数据,实际上这个 dict 字典维护的是一张链表,比如,下图展示的 pubsub_channels 示例中,client 1、client 2 就订阅了 channel 1,而其他频道也分别被其他客户端订阅:
3.当一个Redis客户端publish一个message的时候,会先去服务端的 pubsub_channels 找相应的channel,遍历里面的client,然后发送通知,即完成了整个发布订阅的流程。
 
下面我们通过代码来看看pub/sub的实现吧。
函数功能总览  
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 void  subscribeCommand (client *c)  ; void  unsubscribeCommand (client *c)  ; void  psubscribeCommand (client *c)  ; void  punsubscribeCommand (client *c)  ; void  publishCommand (client *c)  ; void  pubsubCommand (client *c)  ; void  freePubsubPattern (void  *p)  ; int  listMatchPubsubPattern (void  *a, void  *b)  ; int  clientSubscriptionsCount (redisClient *c)  ; int  pubsubSubscribeChannel (redisClient *c, robj *channel)  ; int  pubsubUnsubscribeChannel (redisClient *c, robj *channel, int  notify)  ; int  pubsubSubscribePattern (redisClient *c, robj *pattern)  ; int  pubsubUnsubscribePattern (redisClient *c, robj *pattern, int  notify)  ; int  pubsubUnsubscribeAllChannels (redisClient *c, int  notify)  ; int  pubsubUnsubscribeAllPatterns (redisClient *c, int  notify)  ; int  pubsubPublishMessage (robj *channel, robj *message)  ; 
 
主要函数实现  
Redis客户端订阅频道: 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 void  subscribeCommand (client *c)   {    int  j;     for  (j = 1 ; j < c->argc; j++)         pubsubSubscribeChannel(c,c->argv[j]);          c->flags |= CLIENT_PUBSUB; } int  pubsubSubscribeChannel (client *c, robj *channel)   {    dictEntry *de;     list  *clients = NULL ;     int  retval = 0 ;          if  (dictAdd(c->pubsub_channels,channel,NULL ) == DICT_OK) {         retval = 1 ;         incrRefCount(channel);                  de = dictFind(server.pubsub_channels,channel);         if  (de == NULL ) {                          clients = listCreate();             dictAdd(server.pubsub_channels,channel,clients);             incrRefCount(channel);         } else  {                          clients = dictGetVal(de);         }                  listAddNodeTail(clients,c);     }          addReplyPubsubSubscribed(c,channel);     return  retval; } int  clientSubscriptionsCount (client *c)   {    return  dictSize(c->pubsub_channels)+            listLength(c->pubsub_patterns); } 
 
总结:
客户端自行管理需要订阅的channel, 放到 c->pubsub_channels 中; 
redis使用的一个统一的 server->pubsub_channels dict容器进行管理所有的channel; 
对于多个客户端订阅一个channel, redis 使用list进行管理追加; 
 
Redis客户端退订频道: 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 void  unsubscribeCommand (client *c)   {    if  (c->argc == 1 ) {         pubsubUnsubscribeAllChannels(c,1 );     } else  {         int  j;         for  (j = 1 ; j < c->argc; j++)             pubsubUnsubscribeChannel(c,c->argv[j],1 );     }     if  (clientSubscriptionsCount(c) == 0 ) c->flags &= ~CLIENT_PUBSUB; } int  pubsubUnsubscribeAllChannels (client *c, int  notify)   {    dictIterator *di = dictGetSafeIterator(c->pubsub_channels);     dictEntry *de;     int  count = 0 ;          while ((de = dictNext(di)) != NULL ) {         robj *channel = dictGetKey(de);         count += pubsubUnsubscribeChannel(c,channel,notify);     }          if  (notify && count == 0 ) addReplyPubsubUnsubscribed(c,NULL );     dictReleaseIterator(di);     return  count; } int  pubsubUnsubscribeChannel (client *c, robj *channel, int  notify)   {    dictEntry *de;     list  *clients;     listNode *ln;     int  retval = 0 ;          incrRefCount(channel);     if  (dictDelete(c->pubsub_channels,channel) == DICT_OK) {         retval = 1 ;                  de = dictFind(server.pubsub_channels,channel);         serverAssertWithInfo(c,NULL ,de != NULL );         clients = dictGetVal(de);         ln = listSearchKey(clients,c);         serverAssertWithInfo(c,NULL ,ln != NULL );         listDelNode(clients,ln);         if  (listLength(clients) == 0 ) {                          dictDelete(server.pubsub_channels,channel);         }     }     if  (notify) addReplyPubsubUnsubscribed(c,channel);     decrRefCount(channel);      return  retval; }