首页 > C/C++, Redis > Redis 5.0 重量级特性 Stream 实现源码分析(一)overview,XADD

Redis 5.0 重量级特性 Stream 实现源码分析(一)overview,XADD

2018年8月5日 发表评论 阅读评论 33462次阅读    

kafka在日志处理领域由于其一系列的工具,其他相关服务,基本上算得上是王者,但是缺点也很明显: 难以维护,性能低,需要大量机器,部署复杂,参数众多。
前阵子Redis 5.0 Beta版本发布 , 随之而来的重大特性“Introduction to Redis Streams”, 似乎跟kafka在功能上有很多类似的地方,但也有不少不一样的点。

关于stream的使用,主要的命令是XADD, XREAD,XREADGROUP, XGROUP等指令,具体使用方法可以参考上面的文章,不习惯英文的可以看看这里有篇文章写的也很仔细“挑战Kafka!Redis5.0重量级特性Stream尝鲜”。 这里只分析代码实现方式,功能是怎么实现的。

一、总体介绍

先大概介绍一下几个最重要的点:

  1. stream是一个可持久化的消息队列, 对标kafka,解决了pubsub订阅发布功能不能持久化的问题;
  2. 支持分组多次消费,同样有group,消费者的概念。这样能通过多个group的形式,实现多次消费同一个消息队列,不过redis在这块实现的比较简单,并没有像kafka那样严格的partition,后者在partition上面有复杂的逻辑,比如consumer 挂掉后,其他消费者能一起再选举,选出谁来负责这个对应的partition,一个partition严格只属于某一个consumer来消费。redis在这方面实现比较轻量级;
  3. 支持position,能够消费历史消息,也能轻松移动消费id位置,这样给故障恢复带来了很多可以操作的空间,不至于消息丢了,没办法重复获取历史消息;
  4. 有ACK机制,能够一定程度保证消息的“at least once” 投递。当然带来的问题也显而易见: consumer需要能够支持重放,消息可能重复到来。
  5. 消息可以设置最大保存范围,这样不用担心塞满内存。但是kafka不会立即持久化消息到磁盘,这个依赖aof等常规机制,这一点也很轻量。消息内容是保存在rdb文件里面的,而不是一堆文件。

值得注意的一点是,redis虽然提供了XADD操作,但是不代表redis会主动重传,而是将没有收到XACK的消息保存到了PEL 树里面,等消费者再次来消费的时候,带着一个小id(通常是0-0)的时候,就会将PEL里面的消息发送给这个消费者。所以,没有自动重传的。
接下来了解一下主要的数据结构。

二、重要数据结构

下面先介绍几个重要的用到的数据结构。

0. streamID 消息ID

redis stream的消息ID比较特殊,是一个timestamp-sequencenum的结构,基本上可以理解为先比较时间戳,后比较序列号了。

typedef struct streamID {
    uint64_t ms;        /* Unix time in milliseconds. */
    uint64_t seq;       /* Sequence number. */
} streamID;

1. stream 消息队列

redis的一个stream消息队列用stream结构表示,同样存储在db里面,队列名字就是db里面的key,type用OBJ_STREAM来表示。
里面最重要的衬衣就是rax了,redis引入了一个很重要的树结构,用来做字符串压缩和存储消息队列的,这块后面的文章在写其原理,先介绍一下整体实现方式。
从下面的结构体里面,还有一个rax 是cgroups,用来存储这个stream都有哪些group存在。

typedef struct stream {
    //rax树存储真正的消息内容
    rax *rax;               /* The radix tree holding the stream. */
    //这个stream的消息长度
    uint64_t length;        /* Number of elements inside this stream. */
    //当前stream的最后一个id
    streamID last_id;       /* Zero if there are yet no items. */
    //这个stream有哪些group, group里面会存储对应的有哪些consumers
    rax *cgroups;           /* Consumer groups dictionary: name -> streamCG */
} stream;

2. streamCG 消费组

消费组存储在上面stream结构的cgroups里面,并且有几个重要的成员:
1. last_id 每个组的消费者共享一个last_id代表这个组消费到了什么位置,每次投递后会更新这个group。当然是必须指定group的时候会更新。
2. pel 已经发送给客户端,但是还没有收到XACK的消息都存储在pel树里面,顾名思义“Pending entries list”, 在文章开头也说了,redis不会主动重传而是依赖客户端主动来查询其未确认的消息列表。作者建议对于group消费者,每次启动的时候先用ID : 0-0 来查询其未确认消息,然后自主选择怎么处理。
3. consumers 同理,存储了我这个组有哪些消费者,后面介绍消费者结构。

/* Consumer group. */
typedef struct streamCG {
    //当前我这个stream的最大id
    streamID last_id;
    //还没有收到ACK的消息列表
    rax *pel;
    //这个group里面对一个的consumers有哪些, 后者类型为 streamConsumer 
    rax *consumers;
} streamCG;

3. streamConsumer消费者结构

消费者比较简单,名字,活跃时间,以及pel列表,pel列表group里面也有一个,实际上指向都是一样的。

typedef struct streamConsumer {
    mstime_t seen_time;         /* Last time this consumer was active. */
    sds name;                   、
    //待ACK的消息列表,注意这个功能streamCG 里面的实际上指向一个。也就是说一条消息,两个地方都会记住
    rax *pel;
} streamConsumer;

主要的数据结构介绍差不多了,接下来从新增一条消息开始了解一下实现方法。

三、xadd实现原理

XADD 指令由xaddCommand 实现,其语法为:

XADD key [MAXLEN ] [field value] [field value] ...

函数开始例行检查MAXLEN参数,ID参数,设置标记变量和id, maxlen值,如果指定了的话。 然后调用streamTypeLookupWriteOrCreate 来询到对应stream名的robj结构,从中获取stream结构. 如果stream不存在,会创建之。最后通过streamAppendItem 将参数里面的field列表插入到rax树里面去。

    robj *o;
    stream *s;
    //查询到对应stream名的robj结构,从中获取stream结构. 如果stream不存在,会创建之
    if ((o = streamTypeLookupWriteOrCreate(c,c->argv[1])) == NULL) return;
    s = o->ptr;

    /* Append using the low level function and return the ID. */
    //将后面的参数写入stream中,  stream使用rax树来组织的,然后 每个rax树里面的节点使用listpack列表来组织的,后者又有压缩在里面
    if (streamAppendItem(s,c->argv+field_pos,(c->argc-field_pos)/2,
        &id, id_given ? &id : NULL)
        == C_ERR) 
    {        
        addReplyError(c,"The ID specified in XADD is equal or smaller than the "
                        "target stream top item");
        return;              
    }        
    //返回结果
    addReplyStreamID(c,&id);

streamTypeLookupWriteOrCreate 比较简单, 调用createStreamObject 创建一个stream结构。

robj *streamTypeLookupWriteOrCreate(client *c, robj *key) {
    robj *o = lookupKeyWrite(c->db,key);
    if (o == NULL) {
        //不存在,创建一个stream结构
        o = createStreamObject();
        dbAdd(c->db,key,o);
    } else {
        if (o->type != OBJ_STREAM) {
            addReply(c,shared.wrongtypeerr);
            return NULL;
        }
    }
    return o;
}

继续看一下stream是怎么创建的:

robj *createStreamObject(void) {
    //创建一个stream结构,并且放到redis的通用robj里面、
    stream *s = streamNew();
    robj *o = createObject(OBJ_STREAM,s);
    o->encoding = OBJ_ENCODING_STREAM;
    return o;
}
stream *streamNew(void) {、
    //新建stream, cgroups暂时置空,需要的时候再分配内存
    stream *s = zmalloc(sizeof(*s));
    s->rax = raxNew();
    s->length = 0;
    s->last_id.ms = 0;
    s->last_id.seq = 0;
    s->cgroups = NULL; /* Created on demand to save memory when not used. */
    return s;
} 

可以看出,stream其实也是作为一个robj结构,类型为 OBJ_STREAM, 存储在数据库里面的,类似于其他list等结构,没有什么特殊的。
streamAppendItem 将后面的参数写入stream中, stream使用rax树来组织的,然后 每个rax树里面的节点使用listpack列表来组织的,后者又有压缩在里面。 这块略复杂,篇幅太长后面文章在继续写,里面最重要的就是有个listpack的结构,能够压缩field 列表的内存空间占用,避免重复的key占用太多空间。
listpack这个还挺期待能够加在其他所有key里面的。

接下来 如果传递了maxlen, 这回进行trim,不过暂时先不深入到里面了, 之后就是处理同步问题,重写一下对应的id字段,redis的上层调用函数会把这条指令propgate广播到从库,果从库也都自动创建id,那时间戳就不一样了,所以这里直接修改上层使用的参数了。

    /* Let's rewrite the ID argument with the one actually generated for
     * AOF/replication propagation. */
    robj *idarg = createObjectFromStreamID(&id);
    rewriteClientCommandArgument(c,i,idarg);
    decrRefCount(idarg);

    //通知等待在这个key上面的客户端,给他们新的内容
    if (server.blocked_clients_by_type[BLOCKED_STREAM]) ///在这个阻塞类型上阻塞的客户端数目,用来加速,没必要的就 不需要进去了
        signalKeyAsReady(c->db, c->argv[1]);
}

上面最重要的函数便是signalKeyAsReady, 会标记这个key有新事件,后面需要处理阻塞相关的事情。这块放最后一节说。
下一节开始写XREAD,XREADGROUP的操作原理。

Share
分类: C/C++, Redis 标签: , ,
  1. kulv
    2018年8月7日23:38 | #1

    @tesion
    是的,跟正常的list, hash 一样, 也是在 feedAppendOnlyFile 里面序列化操作命令到AOF文件;
    rdb就是在rdbSaveObject 里面判断类型是 OBJ_STREAM 然后进行对应的序列号,相对复杂一些,因为有rax树。

  2. tesion
    2018年8月7日11:52 | #2

    你好,想问一下Redis Stream是怎么被持久化到本地的?类似rdb或者aof吗?

  1. 2018年8月5日18:58 | #1

注意: 评论者允许使用'@user空格'的方式将自己的评论通知另外评论者。例如, ABC是本文的评论者之一,则使用'@ABC '(不包括单引号)将会自动将您的评论发送给ABC。使用'@all ',将会将评论发送给之前所有其它评论者。请务必注意user必须和评论者名相匹配(大小写一致)。