Redis 5.0 重量级特性 Stream 实现源码分析(一)overview,XADD
kafka在日志处理领域由于其一系列的工具,其他相关服务,基本上算得上是王者,但是缺点也很明显: 难以维护,性能低,需要大量机器,部署复杂,参数众多。
前阵子Redis 5.0 Beta版本发布 , 随之而来的重大特性“Introduction to Redis Streams”, 似乎跟kafka在功能上有很多类似的地方,但也有不少不一样的点。
关于stream的使用,主要的命令是XADD, XREAD,XREADGROUP, XGROUP等指令,具体使用方法可以参考上面的文章,不习惯英文的可以看看这里有篇文章写的也很仔细“挑战Kafka!Redis5.0重量级特性Stream尝鲜”。 这里只分析代码实现方式,功能是怎么实现的。
一、总体介绍
先大概介绍一下几个最重要的点:
- stream是一个可持久化的消息队列, 对标kafka,解决了pubsub订阅发布功能不能持久化的问题;
- 支持分组多次消费,同样有group,消费者的概念。这样能通过多个group的形式,实现多次消费同一个消息队列,不过redis在这块实现的比较简单,并没有像kafka那样严格的partition,后者在partition上面有复杂的逻辑,比如consumer 挂掉后,其他消费者能一起再选举,选出谁来负责这个对应的partition,一个partition严格只属于某一个consumer来消费。redis在这方面实现比较轻量级;
- 支持position,能够消费历史消息,也能轻松移动消费id位置,这样给故障恢复带来了很多可以操作的空间,不至于消息丢了,没办法重复获取历史消息;
- 有ACK机制,能够一定程度保证消息的“at least once” 投递。当然带来的问题也显而易见: consumer需要能够支持重放,消息可能重复到来。
- 消息可以设置最大保存范围,这样不用担心塞满内存。但是kafka不会立即持久化消息到磁盘,这个依赖aof等常规机制,这一点也很轻量。消息内容是保存在rdb文件里面的,而不是一堆文件。
值得注意的一点是,redis虽然提供了XADD操作,但是不代表redis会主动重传,而是将没有收到XACK的消息保存到了PEL 树里面,等消费者再次来消费的时候,带着一个小id(通常是0-0)的时候,就会将PEL里面的消息发送给这个消费者。所以,没有自动重传的。
接下来了解一下主要的数据结构。
二、重要数据结构
下面先介绍几个重要的用到的数据结构。
0. streamID 消息ID
redis stream的消息ID比较特殊,是一个timestamp-sequencenum的结构,基本上可以理解为先比较时间戳,后比较序列号了。
typedef struct streamID { uint64_t ms; /* Unix time in milliseconds. */ uint64_t seq; /* Sequence number. */ } streamID;
1. stream 消息队列
redis的一个stream消息队列用stream结构表示,同样存储在db里面,队列名字就是db里面的key,type用OBJ_STREAM来表示。
里面最重要的衬衣就是rax了,redis引入了一个很重要的树结构,用来做字符串压缩和存储消息队列的,这块后面的文章在写其原理,先介绍一下整体实现方式。
从下面的结构体里面,还有一个rax 是cgroups,用来存储这个stream都有哪些group存在。
typedef struct stream { //rax树存储真正的消息内容 rax *rax; /* The radix tree holding the stream. */ //这个stream的消息长度 uint64_t length; /* Number of elements inside this stream. */ //当前stream的最后一个id streamID last_id; /* Zero if there are yet no items. */ //这个stream有哪些group, group里面会存储对应的有哪些consumers rax *cgroups; /* Consumer groups dictionary: name -> streamCG */ } stream;
2. streamCG 消费组
消费组存储在上面stream结构的cgroups里面,并且有几个重要的成员:
1. last_id 每个组的消费者共享一个last_id代表这个组消费到了什么位置,每次投递后会更新这个group。当然是必须指定group的时候会更新。
2. pel 已经发送给客户端,但是还没有收到XACK的消息都存储在pel树里面,顾名思义“Pending entries list”, 在文章开头也说了,redis不会主动重传而是依赖客户端主动来查询其未确认的消息列表。作者建议对于group消费者,每次启动的时候先用ID : 0-0 来查询其未确认消息,然后自主选择怎么处理。
3. consumers 同理,存储了我这个组有哪些消费者,后面介绍消费者结构。
/* Consumer group. */ typedef struct streamCG { //当前我这个stream的最大id streamID last_id; //还没有收到ACK的消息列表 rax *pel; //这个group里面对一个的consumers有哪些, 后者类型为 streamConsumer rax *consumers; } streamCG;
3. streamConsumer消费者结构
消费者比较简单,名字,活跃时间,以及pel列表,pel列表group里面也有一个,实际上指向都是一样的。
typedef struct streamConsumer { mstime_t seen_time; /* Last time this consumer was active. */ sds name; 、 //待ACK的消息列表,注意这个功能streamCG 里面的实际上指向一个。也就是说一条消息,两个地方都会记住 rax *pel; } streamConsumer;
主要的数据结构介绍差不多了,接下来从新增一条消息开始了解一下实现方法。
三、xadd实现原理
XADD 指令由xaddCommand 实现,其语法为:
XADD key [MAXLEN
] [field value] [field value] ...
函数开始例行检查MAXLEN参数,ID参数,设置标记变量和id, maxlen值,如果指定了的话。 然后调用streamTypeLookupWriteOrCreate 来询到对应stream名的robj结构,从中获取stream结构. 如果stream不存在,会创建之。最后通过streamAppendItem 将参数里面的field列表插入到rax树里面去。
robj *o; stream *s; //查询到对应stream名的robj结构,从中获取stream结构. 如果stream不存在,会创建之 if ((o = streamTypeLookupWriteOrCreate(c,c->argv[1])) == NULL) return; s = o->ptr; /* Append using the low level function and return the ID. */ //将后面的参数写入stream中, stream使用rax树来组织的,然后 每个rax树里面的节点使用listpack列表来组织的,后者又有压缩在里面 if (streamAppendItem(s,c->argv+field_pos,(c->argc-field_pos)/2, &id, id_given ? &id : NULL) == C_ERR) { addReplyError(c,"The ID specified in XADD is equal or smaller than the " "target stream top item"); return; } //返回结果 addReplyStreamID(c,&id);
streamTypeLookupWriteOrCreate 比较简单, 调用createStreamObject 创建一个stream结构。
robj *streamTypeLookupWriteOrCreate(client *c, robj *key) { robj *o = lookupKeyWrite(c->db,key); if (o == NULL) { //不存在,创建一个stream结构 o = createStreamObject(); dbAdd(c->db,key,o); } else { if (o->type != OBJ_STREAM) { addReply(c,shared.wrongtypeerr); return NULL; } } return o; }
继续看一下stream是怎么创建的:
robj *createStreamObject(void) { //创建一个stream结构,并且放到redis的通用robj里面、 stream *s = streamNew(); robj *o = createObject(OBJ_STREAM,s); o->encoding = OBJ_ENCODING_STREAM; return o; } stream *streamNew(void) {、 //新建stream, cgroups暂时置空,需要的时候再分配内存 stream *s = zmalloc(sizeof(*s)); s->rax = raxNew(); s->length = 0; s->last_id.ms = 0; s->last_id.seq = 0; s->cgroups = NULL; /* Created on demand to save memory when not used. */ return s; }
可以看出,stream其实也是作为一个robj结构,类型为 OBJ_STREAM, 存储在数据库里面的,类似于其他list等结构,没有什么特殊的。
streamAppendItem 将后面的参数写入stream中, stream使用rax树来组织的,然后 每个rax树里面的节点使用listpack列表来组织的,后者又有压缩在里面。 这块略复杂,篇幅太长后面文章在继续写,里面最重要的就是有个listpack的结构,能够压缩field 列表的内存空间占用,避免重复的key占用太多空间。
listpack这个还挺期待能够加在其他所有key里面的。
接下来 如果传递了maxlen, 这回进行trim,不过暂时先不深入到里面了, 之后就是处理同步问题,重写一下对应的id字段,redis的上层调用函数会把这条指令propgate广播到从库,果从库也都自动创建id,那时间戳就不一样了,所以这里直接修改上层使用的参数了。
/* Let's rewrite the ID argument with the one actually generated for * AOF/replication propagation. */ robj *idarg = createObjectFromStreamID(&id); rewriteClientCommandArgument(c,i,idarg); decrRefCount(idarg); //通知等待在这个key上面的客户端,给他们新的内容 if (server.blocked_clients_by_type[BLOCKED_STREAM]) ///在这个阻塞类型上阻塞的客户端数目,用来加速,没必要的就 不需要进去了 signalKeyAsReady(c->db, c->argv[1]); }
上面最重要的函数便是signalKeyAsReady, 会标记这个key有新事件,后面需要处理阻塞相关的事情。这块放最后一节说。
下一节开始写XREAD,XREADGROUP的操作原理。
@tesion
是的,跟正常的list, hash 一样, 也是在 feedAppendOnlyFile 里面序列化操作命令到AOF文件;
rdb就是在rdbSaveObject 里面判断类型是 OBJ_STREAM 然后进行对应的序列号,相对复杂一些,因为有rax树。
你好,想问一下Redis Stream是怎么被持久化到本地的?类似rdb或者aof吗?