源码位置:pubsub.c/redis.h
1. 前言 Redis发布订阅(pub/sub)是一种消息通信模式,由三部分组成:发布者(pub),频道(channel),订阅者(sub)。具体结构如下:
发布者和订阅者都是Redis客户端,频道是Redis服务端,发布者将消息发布到某一频道上,订阅了这一频道的订阅者就会收到该条信息。Redis客户端可订阅任意数量的频道。 Redis的发布订阅功能并不保证可靠,因为所有数据都存在内存中,没有提供持久化的功能,也不记录消费端状态,所以相对市面上的一些消息队列相比(如kafka、rabittMQ等),可靠性会差很多。在Redis5.0版本的stream消息队列功能发布之前,会有使用者使用redis-list来实现消息队列和发布订阅的功能,虽然有持久化(AOF & RDB)的功能,但是实现起来比较笨拙,不够方便。
pubsub与stream比较:
pub/sub
stream
不能持久化消息
可以持久化,支持RDB和AOF两种持久化机制
没有消息队列中群组的概念
引入了消费组的概念
redis客户端断线重连会丢失中间的数据
支持position,能够消费历史消息。断线后支持消息继续从上次的时间点读取,不会丢失消息,也可以直接读取最新消息
redis断线后需要重新订阅
不存在这个问题
没有ack机制
有ACK机制,能够一定程度保证消息“at least once” 消费
基于stream消息队列的多种好处,pub/sub功能仅做源码学习,实际项目中推荐使用stream或者kafka等消息队列。
2. 命令介绍 下面介绍命令的使用详情:
PSUBSCRIBE pattern [pattern1 ….]
说明:订阅一个或多个符合给定模式的频道,每个模式以*作为匹配符
参数:pattern(给定的模式)
返回:接受到的信息
PUNSUBSCRIBE pattern [pattern1 ….]
说明:用于退订所有给定模式的频道
参数:pattern(给定的模式)
返回:这个命令在不同的客户端中有不同的表现。
SUBSCRIBE channel [channel1 …]
说明:用于订阅给定的一个或多个频道的信息
参数:channel(给定的频道名)
返回:接收到的信息
UNSUBSCRIBE channel [channel1 …]
说明:用于退订给定的一个或多个频道的信息
参数:channel(给定的频道名)
返回:这个命令在不同的客户端中有不同的表现
PUBLISH channel message
说明:用于将信息发送到指定的频道
参数:channel(频道名称),message(将要发送的信息)
返回:接收到此消息的订阅者数量
PUBSUB < subcommand > argument [argument1 ….]
说明:用于查看订阅与发布系统状态,它由数个不同格式的子命令组成
参数:subcommand(子命令),argument(子命令参数)
返回:由活跃频道组成的列表
子命令如下:
subcommand
argument
说明
CHANNELS
[pattern]
返回指定模式pattern的活跃的频道,指定返回由SUBSCRIBE订阅的频道
NUMSUB
channel channel2 …
返回指定频道的订阅数量
NUMPAT
返回订阅模式的数量,注意:这个命令返回的不是订阅模式的客户端的数量, 而是客户端订阅的所有模式的数量总和
3. 实现原理 每个 Redis 服务器进程维持着一个标识服务器状态 的 redis.h/redisServer 结构,其中就 保存着有订阅的频道 以及 订阅模式 的信息:
1 2 3 4 5 6 7 8 9 10 11 12 13 struct redisServer { dict *pubsub_channels; list *pubsub_patterns; }; typedef struct client { dict *pubsub_channels; list *pubsub_patterns; }
1.在Redis服务端内部维护了一个 pubsub_channels
的channel列表,记录了此客户端所订阅的频道。
2.当客户端订阅某一个频道之后,Redis服务端就会往自身的 pubsub_channels
这个字典变量中新添加一条数据,实际上这个 dict 字典维护的是一张链表,比如,下图展示的 pubsub_channels 示例中,client 1、client 2 就订阅了 channel 1,而其他频道也分别被其他客户端订阅:
3.当一个Redis客户端publish一个message的时候,会先去服务端的 pubsub_channels
找相应的channel,遍历里面的client,然后发送通知,即完成了整个发布订阅的流程。
下面我们通过代码来看看pub/sub的实现吧。
函数功能总览
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 void subscribeCommand (client *c) ; void unsubscribeCommand (client *c) ; void psubscribeCommand (client *c) ; void punsubscribeCommand (client *c) ; void publishCommand (client *c) ; void pubsubCommand (client *c) ; void freePubsubPattern (void *p) ; int listMatchPubsubPattern (void *a, void *b) ; int clientSubscriptionsCount (redisClient *c) ; int pubsubSubscribeChannel (redisClient *c, robj *channel) ; int pubsubUnsubscribeChannel (redisClient *c, robj *channel, int notify) ; int pubsubSubscribePattern (redisClient *c, robj *pattern) ; int pubsubUnsubscribePattern (redisClient *c, robj *pattern, int notify) ; int pubsubUnsubscribeAllChannels (redisClient *c, int notify) ; int pubsubUnsubscribeAllPatterns (redisClient *c, int notify) ; int pubsubPublishMessage (robj *channel, robj *message) ;
主要函数实现
Redis客户端订阅频道:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 void subscribeCommand (client *c) { int j; for (j = 1 ; j < c->argc; j++) pubsubSubscribeChannel(c,c->argv[j]); c->flags |= CLIENT_PUBSUB; } int pubsubSubscribeChannel (client *c, robj *channel) { dictEntry *de; list *clients = NULL ; int retval = 0 ; if (dictAdd(c->pubsub_channels,channel,NULL ) == DICT_OK) { retval = 1 ; incrRefCount(channel); de = dictFind(server.pubsub_channels,channel); if (de == NULL ) { clients = listCreate(); dictAdd(server.pubsub_channels,channel,clients); incrRefCount(channel); } else { clients = dictGetVal(de); } listAddNodeTail(clients,c); } addReplyPubsubSubscribed(c,channel); return retval; } int clientSubscriptionsCount (client *c) { return dictSize(c->pubsub_channels)+ listLength(c->pubsub_patterns); }
总结:
客户端自行管理需要订阅的channel, 放到 c->pubsub_channels 中;
redis使用的一个统一的 server->pubsub_channels dict容器进行管理所有的channel;
对于多个客户端订阅一个channel, redis 使用list进行管理追加;
Redis客户端退订频道:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 void unsubscribeCommand (client *c) { if (c->argc == 1 ) { pubsubUnsubscribeAllChannels(c,1 ); } else { int j; for (j = 1 ; j < c->argc; j++) pubsubUnsubscribeChannel(c,c->argv[j],1 ); } if (clientSubscriptionsCount(c) == 0 ) c->flags &= ~CLIENT_PUBSUB; } int pubsubUnsubscribeAllChannels (client *c, int notify) { dictIterator *di = dictGetSafeIterator(c->pubsub_channels); dictEntry *de; int count = 0 ; while ((de = dictNext(di)) != NULL ) { robj *channel = dictGetKey(de); count += pubsubUnsubscribeChannel(c,channel,notify); } if (notify && count == 0 ) addReplyPubsubUnsubscribed(c,NULL ); dictReleaseIterator(di); return count; } int pubsubUnsubscribeChannel (client *c, robj *channel, int notify) { dictEntry *de; list *clients; listNode *ln; int retval = 0 ; incrRefCount(channel); if (dictDelete(c->pubsub_channels,channel) == DICT_OK) { retval = 1 ; de = dictFind(server.pubsub_channels,channel); serverAssertWithInfo(c,NULL ,de != NULL ); clients = dictGetVal(de); ln = listSearchKey(clients,c); serverAssertWithInfo(c,NULL ,ln != NULL ); listDelNode(clients,ln); if (listLength(clients) == 0 ) { dictDelete(server.pubsub_channels,channel); } } if (notify) addReplyPubsubUnsubscribed(c,channel); decrRefCount(channel); return retval; }