源码位置:multi.c/redis.h
1. 简介 Redis 事务的本质是一组命令的集合。事务支持一次执行多个命令,一个事务中所有命令都会被序列化。在事务执行过程,会按照顺序串行化执行队列中的命令,其他客户端提交的命令请求不会插入到事务执行命令序列中。 总结说:redis事务就是一次性、顺序性、排他性的执行一个队列中的一系列命令。
Redis事务的ACID特性:
A原子性(atomicity) 单个Redis命令的执行是原子性的,但Redis没有在事务上增加任何维护原子性的机制,所以Redis事务的执行不是原子性 的。 另一方面,如果 Redis 服务器进程在执行事务的过程中被停止 —— 比如接到 KILL 信号、宿主机器停机,等等,那么事务执行失败 事务失败时,Redis 也不会进行任何的重试或者回滚动作,不满足要么全部全部执行,要么都不执行的条件
C一致性(consistency): 一致性分下面几种情况来讨论:
首先,如果一个事务的指令全部被执行,那么数据库的状态是满足数据库完整性约束的
其次,如果一个事务中有的指令有错误,那么数据库的状态是满足数据完整性约束的
最后,如果事务运行到某条指令时,进程被kill掉了,那么要分下面几种情况讨论:
如果当前redis采用的是内存模式,那么重启之后redis数据库是空的,那么满足一致性条件
如果当前采用RDB模式存储的,在执行事务时,Redis 不会中断事务去执行保存 RDB 的工作,只有在事务执行之后,保存 RDB 的工作才有可能开始。所以当 RDB 模式下的 Redis 服务器进程在事务中途被杀死时,事务内执行的命令,不管成功了多少,都不会被保存到 RDB 文件里。 恢复数据库需要使用现有的 RDB 文件,而这个 RDB 文件的数据保存的是最近一次的数 据库快照(snapshot),所以它的数据可能不是最新的,但只要 RDB 文件本身没有因为其他问题而出错,那么还原后的数据库就是一致的
如果当前采用的是AOF存储的,那么可能事务的内容还未写入到AOF文件,那么此时肯定是满足一致性的,如果事务的内容有部分写入到AOF文件中,那么需要用工具把AOF中事务执行部分成功的指令移除,这时,移除之后的AOF文件也是满足一致性的
所以,redis事务满足一致性约束 。
I隔离性(isolation): Redis 是单进程程序,并且它保证在执行事务时,不会对事务进行中断,事务可以运行直到执行完所有事务队列中的命令为止。因此,Redis 的事务是总是带有隔离性 的。
D持久性(durability): 因为事务不过是用队列包裹起了一组 Redis 命令,并没有提供任何额外的持久性功能,所以事务的持久性由 Redis 所使用的持久化模式决定
在单纯的内存模式下,事务肯定是不持久的
在 RDB 模式下,服务器可能在事务执行之后、RDB 文件更新之前的这段时间失败,所以 RDB 模式下的 Redis 事务也是不持久的
在 AOF 的“总是 SYNC ”模式下,事务的每条命令在执行成功之后,都会立即调用 fsync 或 fdatasync 将事务数据写入到 AOF 文件。但是,这种保存是由后台线程进行的,主线程不会阻塞直到保存成功,所以从命令执行成功到数据保存到硬盘之间,还是有一段非常小的间隔,所以这种模式下的事务也是不持久的。
其他 AOF 模式也和“总是 SYNC ”模式类似,所以它们都是不持久的。
2. 命令介绍 下面介绍命令的使用详情:
命令
作用
MULTI
标记一个事务的开始
DISCARD
取消事务,放弃执行事务块内的所有命令
EXEC
执行事务内的所有命令
WATCH key [key …]
监视一个(或多个) key ,如果在事务执行之前这个(或这些) key 被其他命令所改动,那么事务将被打断,不会被执行
UNWATCH
取消 WATCH 命令对所有 key 的监视
3. 实现细节 Redis事务从开始到结束通常分为三步:
事务开始(MULTI):MULTI命令可以将执行该命令的客户端从非事务状态切换成事务状态,这一切换是通过客户端状态的flags属性中打开 CLIENT_MULTI
标识完成的。
命令入队:切换到事务状态后,该客户端输入的所有命令,都会被暂存到一个命令队列里,不会立即执行。
事务执行(EXEX):EXEC命令将命令队列里的命令挨个执行完成。
Redis会把每个连接的客户端封装成一个client结构体,该结构体包含大量的字段用来保存需要的信息。其中,事务相关的字段如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 typedef struct client { multiState mstate; list *watched_keys; } typedef struct multiCmd { robj **argv; int argc; struct redisCommand *cmd ; } multiCmd; typedef struct multiState { multiCmd *commands; int count; int cmd_flags; int minreplicas; time_t minreplicas_timeout; } multiState; typedef struct watchedKey { robj *key; redisDb *db; } watchedKey;
需要注意的是,客户端打开事务操作标识后,只有命令:EXEC、DISCARD、WATCH、MULTI命令会被立即执行,该逻辑在server.c文件中的processCommand方法中:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 int processCommand (client *c) { if (c->flags & CLIENT_MULTI && c->cmd->proc != execCommand && c->cmd->proc != discardCommand && c->cmd->proc != multiCommand && c->cmd->proc != watchCommand) { queueMultiCommand(c); addReply(c,shared.queued); } else { call(c,CMD_CALL_FULL); c->woff = server.master_repl_offset; if (listLength(server.ready_keys)) handleClientsBlockedOnKeys(); } } void queueMultiCommand (client *c) { multiCmd *mc; int j; c->mstate.commands = zrealloc(c->mstate.commands, sizeof (multiCmd)*(c->mstate.count+1 )); mc = c->mstate.commands+c->mstate.count; mc->cmd = c->cmd; mc->argc = c->argc; mc->argv = zmalloc(sizeof (robj*)*c->argc); memcpy (mc->argv,c->argv,sizeof (robj*)*c->argc); for (j = 0 ; j < c->argc; j++) incrRefCount(mc->argv[j]); c->mstate.count++; c->mstate.cmd_flags |= c->cmd->flags; }
最后我们考虑一下watch机制的触发时机,现在我们已经把想要watch的key加入到了watch的数据结构中,可以想到触发watch的时机应该是修改key的内容时,通知到所有watch了该key的客户端。该触发机制的源码在multi.c文件的touchWatchedKey()
函数中实现。
函数功能总览
1 2 3 4 5 void multiCommand (client *c) ; void execCommand (client *c) ; void discardCommand (client *c) ; void watchCommand (client *c) ; void unwatchCommand (client *c) ;
主要函数实现
事务开始:
1 2 3 4 5 6 7 8 9 void multiCommand (client *c) { if (c->flags & CLIENT_MULTI) { addReplyError(c,"MULTI calls can not be nested" ); return ; } c->flags |= CLIENT_MULTI; addReply(c,shared.ok); }
执行事务:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 void execCommand (client *c) { int j; robj **orig_argv; int orig_argc; struct redisCommand *orig_cmd ; int must_propagate = 0 ; int was_master = server.masterhost == NULL ; if (!(c->flags & CLIENT_MULTI)) { addReplyError(c,"EXEC without MULTI" ); return ; } if (c->flags & (CLIENT_DIRTY_CAS|CLIENT_DIRTY_EXEC)) { addReply(c, c->flags & CLIENT_DIRTY_EXEC ? shared.execaborterr : shared.nullarray[c->resp]); discardTransaction(c); goto handle_monitor; } if (!server.loading && server.masterhost && server.repl_slave_ro && !(c->flags & CLIENT_MASTER) && c->mstate.cmd_flags & CMD_WRITE) { addReplyError(c, "Transaction contains write commands but instance " "is now a read-only replica. EXEC aborted." ); discardTransaction(c); goto handle_monitor; } unwatchAllKeys(c); orig_argv = c->argv; orig_argc = c->argc; orig_cmd = c->cmd; addReplyArrayLen(c,c->mstate.count); for (j = 0 ; j < c->mstate.count; j++) { c->argc = c->mstate.commands[j].argc; c->argv = c->mstate.commands[j].argv; c->cmd = c->mstate.commands[j].cmd; if (!must_propagate && !(c->cmd->flags & (CMD_READONLY|CMD_ADMIN))) { execCommandPropagateMulti(c); must_propagate = 1 ; } int acl_keypos; int acl_retval = ACLCheckCommandPerm(c,&acl_keypos); if (acl_retval != ACL_OK) { addACLLogEntry(c,acl_retval,acl_keypos,NULL ); addReplyErrorFormat(c, "-NOPERM ACLs rules changed between the moment the " "transaction was accumulated and the EXEC call. " "This command is no longer allowed for the " "following reason: %s" , (acl_retval == ACL_DENIED_CMD) ? "no permission to execute the command or subcommand" : "no permission to touch the specified keys" ); } else { call(c,server.loading ? CMD_CALL_NONE : CMD_CALL_FULL); } c->mstate.commands[j].argc = c->argc; c->mstate.commands[j].argv = c->argv; c->mstate.commands[j].cmd = c->cmd; } c->argv = orig_argv; c->argc = orig_argc; c->cmd = orig_cmd; discardTransaction(c); if (must_propagate) { int is_master = server.masterhost == NULL ; server.dirty++; if (server.repl_backlog && was_master && !is_master) { char *execcmd = "*1\r\n$4\r\nEXEC\r\n" ; feedReplicationBacklog(execcmd,strlen (execcmd)); } } handle_monitor: if (listLength(server.monitors) && !server.loading) replicationFeedMonitors(c,server.monitors,c->db->id,c->argv,c->argc); }
取消事务:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 void discardCommand (client *c) { if (!(c->flags & CLIENT_MULTI)) { addReplyError(c,"DISCARD without MULTI" ); return ; } discardTransaction(c); addReply(c,shared.ok); } void discardTransaction (client *c) { freeClientMultiState(c); initClientMultiState(c); c->flags &= ~(CLIENT_MULTI|CLIENT_DIRTY_CAS|CLIENT_DIRTY_EXEC); unwatchAllKeys(c); } void freeClientMultiState (client *c) { int j; for (j = 0 ; j < c->mstate.count; j++) { int i; multiCmd *mc = c->mstate.commands+j; for (i = 0 ; i < mc->argc; i++) decrRefCount(mc->argv[i]); zfree(mc->argv); } zfree(c->mstate.commands); } void initClientMultiState (client *c) { c->mstate.commands = NULL ; c->mstate.count = 0 ; c->mstate.cmd_flags = 0 ; }
watch监视:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 void watchCommand (client *c) { int j; if (c->flags & CLIENT_MULTI) { addReplyError(c,"WATCH inside MULTI is not allowed" ); return ; } for (j = 1 ; j < c->argc; j++) watchForKey(c,c->argv[j]); addReply(c,shared.ok); } void watchForKey (client *c, robj *key) { list *clients = NULL ; listIter li; listNode *ln; watchedKey *wk; listRewind(c->watched_keys,&li); while ((ln = listNext(&li))) { wk = listNodeValue(ln); if (wk->db == c->db && equalStringObjects(key,wk->key)) return ; } clients = dictFetchValue(c->db->watched_keys,key); if (!clients) { clients = listCreate(); dictAdd(c->db->watched_keys,key,clients); incrRefCount(key); } listAddNodeTail(clients,c); wk = zmalloc(sizeof (*wk)); wk->key = key; wk->db = c->db; incrRefCount(key); listAddNodeTail(c->watched_keys,wk); }
unwatch取消监视:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 void unwatchCommand (client *c) { unwatchAllKeys(c); c->flags &= (~CLIENT_DIRTY_CAS); addReply(c,shared.ok); } void unwatchAllKeys (client *c) { listIter li; listNode *ln; if (listLength(c->watched_keys) == 0 ) return ; listRewind(c->watched_keys,&li); while ((ln = listNext(&li))) { list *clients; watchedKey *wk; wk = listNodeValue(ln); clients = dictFetchValue(wk->db->watched_keys, wk->key); serverAssertWithInfo(c,NULL ,clients != NULL ); listDelNode(clients,listSearchKey(clients,c)); if (listLength(clients) == 0 ) dictDelete(wk->db->watched_keys, wk->key); listDelNode(c->watched_keys,ln); decrRefCount(wk->key); zfree(wk); } }