首页 > C/C++, Redis > Redis 5.0 重量级特性 Stream 实现源码分析(二)XREAD 消费流程

Redis 5.0 重量级特性 Stream 实现源码分析(二)XREAD 消费流程

2018年8月5日 发表评论 阅读评论 30994次阅读    

上面的文章整体介绍了stream 实现方式,以及xadd生产端的流程,接下来继续写一下后面消费端的过程。
redis stream的消费方法有几种, XREAD、XREADGROUP, 还有xrange/xrevrange, 后者比较简单,主要就是准备参数然后调用streamReplyWithRange 来根据范围读取消息内容。

一、xrange 范围消息读取

xrange 语法为:

XRANGE key start end [COUNT < n> ]

先来看一下xrange的代码,前面部分例行检查,获取start,end id范围。

void xrangeGenericCommand(client *c, int rev) {
    //读取某一段消息
    robj *o;
    stream *s;
    streamID startid, endid;
    long long count = 0;
    robj *startarg = rev ? c-&gt;argv[3] : c-&gt;argv[2];
    robj *endarg = rev ? c-&gt;argv[2] : c-&gt;argv[3];

    if (streamParseIDOrReply(c,startarg,&amp;startid,0) == C_ERR) return;
    if (streamParseIDOrReply(c,endarg,&amp;endid,UINT64_MAX) == C_ERR) return;

    /* Parse the COUNT option if any. */
    if (c-&gt;argc &gt; 4) {
    //----
    }

    /* Return the specified range to the user. */
    if ((o = lookupKeyReadOrReply(c,c-&gt;argv[1],shared.emptymultibulk)) == NULL
        || checkType(c,o,OBJ_STREAM)) return;
    s = o-&gt;ptr;
    streamReplyWithRange(c,s,&amp;startid,&amp;endid,count,rev,NULL,NULL,0,NULL);
}

得到stream名字后,调用lookupKeyReadOrReply 来查询,注意lookupKeyReadOrReply 函数是任何redis key的查询函数,redis对stream的处理也没有特殊的,还是存储在db里面。
接下来调用streamReplyWithRange 根据group和consumer参数,读取start到end的最多count个元素,可以反向读取。 具体逻辑跟rax树相关,先放后面介绍。 来看看xread的处理方式。

二、XREAD/XREADGROUP group consumer机制实现原理

xread 和 xreadgroup 的处理函数都是 xreadCommand(), 以阻塞读取某个stream内容。两个命令的区别是 后者多带有这两组参数,并且必须同时出现:

XREAD [BLOCK < milliseconds >] [COUNT < count >] STREAMS key_1 key_2 ... key_N ID_1 ID_2 ... ID_N
group read多包括这两组参数: [GROUP group-name consumer-name]

0. 参数检查

readCommand 函数相对比较复杂,首先进行参数检查,初始化数据结构,如果指定了groupname,则需要检查这个groupname是否在对应的stream里面存在,以及需要设置对应的每个stream上面的group数据结构 streamCG 指针。
这里要注意的时候,xread支持一次读取多个stream,但是groupname只能指定一个,但是之前的文章我们知道,每个stream其实都有独立的groupname,以及独立的消费者名,所以这里需要拷贝一下group,但是指针不一样。

    //如果指定了group,需要为每个stream申请一个streamCG结构
    if (groupname) groups = zmalloc(sizeof(streamCG*)*streams_count);

    //循环遍历所有的stream 名字和后面的id字段,进行stream存在性和group存在性检查
    //同时会根据ID来设置对应stream ids的读取起始位置
    for (int i = streams_arg + streams_count; i &lt; c-&gt;argc; i++) {
        //i指向遍历的后面的id部分
        //id_idx 是后面的每个stream对应的开始ID, 第几个stream的id
        int id_idx = i - streams_arg - streams_count;
        //key是对应i的stream位置
        robj *key = c-&gt;argv[i-streams_count];
        robj *o = lookupKeyRead(c-&gt;db,key); //查到对应stream结构
        if (o &amp;&amp; checkType(c,o,OBJ_STREAM)) goto cleanup;
        streamCG *group = NULL;

        /* If a group was specified, than we need to be sure that the
         * key and group actually exist. */
        if (groupname) {
            if (o == NULL || (group = streamLookupCG(o-&gt;ptr,groupname-&gt;ptr)) == NULL)
            { //指定了群组,并且stream不存在,或者stream对应的groupname不存在,报错
                //所以这里有意思,read的时候,如果是readgroup, 那么所哟的stream必须存在,否则单纯的read时stream可以不存在
                addReplyErrorFormat(c, &quot;-NOGROUP ...&quot;,  (char*)key-&gt;ptr,(char*)groupname-&gt;ptr);
                goto cleanup;
            }
            //记录对应的group,也就是对应stream的某个消费组
            groups[id_idx] = group;
        }

接下来是处理参数ID, 如果是"$" 代表当前stream的最后,最大last_id,">" 代表本消费组的最大id. 将结果放在ids[]数组里面。

        if (strcmp(c-&gt;argv[i]-&gt;ptr,&quot;$&quot;) == 0) {
            if (o) {
                stream *s = o-&gt;ptr;
                ids[id_idx] = s-&gt;last_id; //指定的是$, 那么就设置为当前stream的最后一个id
            } else {
                ids[id_idx].ms = 0; //如果这个stream不存在,就为0,什么都要
                ids[id_idx].seq = 0;
            }
            continue;
        } else if (strcmp(c-&gt;argv[i]-&gt;ptr,&quot;&gt;&quot;) == 0) {
            //从本群的最后一个开始,
            if (!xreadgroup || groupname == NULL) {
                addReplyError(c,&quot;The &gt; ID can be specified only when calling &quot;
                                &quot;XREADGROUP using the GROUP &lt;group&gt; &quot;
                                &quot;&lt;consumer&gt; option.&quot;);
                goto cleanup;
            }
            ids[id_idx] = group-&gt;last_id;
            continue;
        }
        if (streamParseIDOrReply(c,c-&gt;argv[i],ids+id_idx,0) != C_OK)
            goto cleanup;
    }

1. 同步读取已有消息

上面初始化groups数组和id数组后,找到了streams_count 个key需要处理,于是循环遍历每一个key,去读取消息。先找到对应stream的结构,然后看对应的stream的最大的id是否比参数里的id要大 , 果是,就有内容,否则没有新的内容,直接跳过继续。
注意count代表的是每个stream最大读取数目,而不是总和。redis不保证返回的数目条数。
看一下代码,具体看注释。

    //遍历每一个stream, 每个stream都最多读取 count个元素
    for (int i = 0; i &lt; streams_count; i++) {
        //先找到对应stream的结构,然后看对应的stream的最大的id是否比参数里的id要大,
        //如果是,就有内容,否则没有新的内容,直接跳过继续
        robj *o = lookupKeyRead(c-&gt;db,c-&gt;argv[streams_arg+i]);
        if (o == NULL) continue;
        stream *s = o-&gt;ptr;
        streamID *gt = ids+i; /* ID must be greater than this. */
        if (s-&gt;last_id.ms &gt; gt-&gt;ms ||
            (s-&gt;last_id.ms == gt-&gt;ms &amp;&amp; s-&gt;last_id.seq &gt; gt-&gt;seq))
        {//当前stream的最大id要大于参数的最大id,有心内容
            arraylen++;
            if (arraylen == 1) arraylen_ptr = addDeferredMultiBulkLength(c);
            /* streamReplyWithRange() handles the &#039;start&#039; ID as inclusive,
             * so start from the next ID, since we want only messages with
             * IDs greater than start. */
            streamID start = *gt;//这是开始位置
            start.seq++; /* uint64_t can&#039;t overflow in this context. */

            /* Emit the two elements sub-array consisting of the name
             * of the stream and the data we extracted from it. */

            //组成返回数据结构,bulk
            addReplyMultiBulkLen(c,2);
            addReplyBulk(c,c-&gt;argv[streams_arg+i]);
            streamConsumer *consumer = NULL;
            if (groups){
                //查找这个group里面的consumer, 根据参数的consumername , 如果没有,就会默认创建一个
                consumer = streamLookupConsumer(groups[i], consumername-&gt;ptr,1);
            }
            streamPropInfo spi = {c-&gt;argv[i+streams_arg], groupname};

            //传入group消费组和消费者id, 读取start开始的count个元素, 放到client的发送缓冲区里面
            streamReplyWithRange(c,s,&amp;start,NULL,count,0,
                                 groups ? groups[i] : NULL,
                                 consumer, noack, &amp;spi);
            if (groups) server.dirty++;
        }
    }

上面streamLookupConsumer 函数会读取对应的消费者consumer结构 streamConsumer, 如果没有,就会自动创建,但是redis不会自动删除,因为有消费者id的pel列表存在,需要手动删除。具体可以看看作者这文章(Introduction to Redis Streams)后面“Consumer groups”的说明 ,还是挺复杂的。

streamConsumer *streamLookupConsumer(streamCG *cg, sds name, int create) {
    //查找这个group里面的consumer, 根据参数的consumername , 如果没有,就会默认创建一个
    //group对应的consumers列表放在cg-&gt;consumers 的rax树里面 
    streamConsumer *consumer = raxFind(cg-&gt;consumers,(unsigned char*)name,
                               sdslen(name));
    if (consumer == raxNotFound) {
        if (!create) return NULL;
        consumer = zmalloc(sizeof(*consumer));
        consumer-&gt;name = sdsdup(name);
        consumer-&gt;pel = raxNew();

        //group对应的consumer列表放在rax树里面
        raxInsert(cg-&gt;consumers,(unsigned char*)name,sdslen(name),
                  consumer,NULL);
    }
    consumer-&gt;seen_time = mstime();
    return consumer;
}

streamReplyWithRange 传入group消费组和消费者id, 读取start开始的count个元素, 放到client的发送缓冲区里面。
接下来判断一下是否读取到了消息内容,如果读取到了,就会立即给客户端返回,但是数目不保证。如果一条都没有读取到,注意是一条都没有读取到,就会考虑进行sleep阻塞, arraylen 代表有多少个key有事件,并不是有多少消息。

     /* We replied synchronously! Set the top array len and return to caller. */
    //只要成功读取到了内容,就不用等待,因此这里的意思是,只要读取到了一条 就不等待,而不是读取到count条就等待
    //这个一定要注意,不是读取满count条, 而是只要有一条就返回
    if (arraylen) {
        setDeferredMultiBulkLength(c,arraylen_ptr,arraylen);
        goto cleanup;
    }

2. BLOCK阻塞等待消息

如果上面同步检查所有stream都没有消息,就会进入到下面的timeout流程,检查是否要等待。如果提供了BLOCK < milisecond >参数,表示最多等多久。
检查timeout 大于0后,会调用blockForKeys 登记这个客户端到每个等待的key的block列表里面, 等有新事件发生的时候,xadd会调用 signalKeyAsReady 来找一个对应的客户端然后通知他的. 果有其他客户端需改对应的key,就会触发handleClientsBlockedOnKeys 函数进行处理。

    //还没有读满,需要继续等待
    /* Block if needed. */
    if (timeout != -1) {
        /* If we are inside a MULTI/EXEC and the list is empty the only thing
         * we can do is treating it as a timeout (even with timeout 0). */
        if (c-&gt;flags &amp; CLIENT_MULTI) {
            addReply(c,shared.nullmultibulk);
            goto cleanup;
        }
        //登记这个客户端到每个等待的key的block列表里面 , 
        //等有新事件发生的时候,xadd会调用 signalKeyAsReady 来找一个对应的客户端然后通知他的
        //如果有其他客户端需改对应的key,就会触发handleClientsBlockedOnKeys 函数进行处理
        blockForKeys(c, BLOCKED_STREAM, c-&gt;argv+streams_arg, streams_count, timeout, NULL, ids);
        /* If no COUNT is given and we block, set a relatively small count:
         * in case the ID provided is too low, we do not want the server to
         * block just to serve this client a huge stream of messages. */
        c-&gt;bpop.xread_count = count ? count : XREAD_BLOCKED_DEFAULT_COUNT;                                                       

        /* If this is a XREADGROUP + GROUP we need to remember for which
         * group and consumer name we are blocking, so later when one of the
         * keys receive more data, we can call streamReplyWithRange() passing
         * the right arguments. */
        if (groupname) {
            //记录等待的group 名字和消费者名字,在 触发事件后,需要更新group的位置
            incrRefCount(groupname);
            incrRefCount(consumername);
            c-&gt;bpop.xread_group = groupname;
            c-&gt;bpop.xread_consumer = consumername;
        } else {
            c-&gt;bpop.xread_group = NULL;
            c-&gt;bpop.xread_consumer = NULL;
        }
        goto cleanup;
    }

关于block逻辑,blockForKeys 主要涉及到redis之前的block方式,暂时不进入了,主要来介绍一下当key有新消息到来后,stream部分是怎么处理的。 这块涉及到 handleClientsBlockedOnKeys 里面对应不同类型的key的唤醒逻辑。

三、同步阻塞等待读取消息

上面我们知道,redis会调用blockForKeys 来设置当前客户端阻塞在某个key上面,之后请求就会返回,服务器不会真正阻塞的,而只是记录相关的等待结构,然后继续处理别的请求。

当有生产者 XADD 后,就会调用signalKeyAsReady 来触发消息的投递任务。也就是handleClientsBlockedOnKeys 的工作。
processCommand() 里面调用call()处理结束后,判断ready_keys 调用这里来处理这条指令的block列表, 话说这里虽然用了list来传递数据,但其实还是阻塞的形式,要是异步的就好了,对于等待在某个key的客户端列表,使用的是先进后出的优先顺序,当然stream会有group的逻辑在里面。

来看一下handleClientsBlockedOnKeys 代码。

void handleClientsBlockedOnKeys(void) {
    while(listLength(server.ready_keys) != 0) {
        //这个循环上面注释说了,为了避免BRPOPLPUSH 会再产生新的触发事件,所以得再次判断
        list *l;
        //挪出来,以便新的二次触发事件放到ready_keys里面
        l = server.ready_keys;
        server.ready_keys = listCreate();

        while(listLength(l) != 0) { //下面根据这个阻塞的key的类型来分别进行对应的处理
            listNode *ln = listFirst(l);
            readyList *rl = ln-&gt;value;

上面简单初始化,然后遍历所有有更新事件的key,所谓的 readyList , 然后将这个key用lookupKeyWrite获取对应等待的key的类型, robj,从中我们可以获取对应key的类型了。

            /* Serve clients blocked on list key. */
            robj *o = lookupKeyWrite(rl-&gt;db,rl-&gt;key);
            if (o != NULL &amp;&amp; o-&gt;type == OBJ_LIST) {
            }else if (o != NULL &amp;&amp; o-&gt;type == OBJ_ZSET) {
            }else if (o != NULL &amp;&amp; o-&gt;type == OBJ_STREAM) {
                //处理stream类型的等待事件, 有客户端等在这上面 , 先取出在这个key上等待的客户端列表
                //然后
                dictEntry *de = dictFind(rl-&gt;db-&gt;blocking_keys,rl-&gt;key);
                stream *s = o-&gt;ptr;

如果是OBJ_STREAM, 理stream类型的等待事件, 有客户端等在这上面 , 先取出在这个key上等待的客户端列表。 然后一个个判断是否有符合这个客户端的id要求。

    list *clients = dictGetVal(de);
    listNode *ln;
    listIter li;
    listRewind(clients,&amp;li);

    //下面循环所有等待的客户端列表,一个个判断他们是否跟当前key匹配,如果id等都合适,
    //那么调用streamReplyWithRange 去扫描发送消息。同时,如果有多个consumer等待在一个key上的情况
    //怎么处理呢? 不会发生重复吗? 答案当然不会,因为streamReplyWithRange每次回更新最大的last_id,
    //然后下回进来的时候第二个消费者其实不会有什么实际的操作发生
    while((ln = listNext(&amp;li))) {
        client *receiver = listNodeValue(ln);
        //一个客户端可能block在多个key上? 是的
        //每个客户端的bpop结构里面记录了我都阻塞在了哪些key里面
        //但是,同时只能等待在某一类key上面,不能是多种。 因为blockClient 里面会重置btype, 且等待状态会拒绝其他请求
        if (receiver-&gt;btype != BLOCKED_STREAM) continue;
        //从这个客户端的等待的key列表里面找到这个key,然后比较id是否有新的,如果有新的就发送数据并且解开等待状态
        streamID *gt = dictFetchValue(receiver-&gt;bpop.keys, rl-&gt;key);
        if (s-&gt;last_id.ms &gt; gt-&gt;ms ||
            (s-&gt;last_id.ms == gt-&gt;ms &amp;&amp; s-&gt;last_id.seq &gt; gt-&gt;seq))
        {
            streamID start = *gt;//这个客户端等待的开始ID
            start.seq++; /* Can&#039;t overflow, it&#039;s an uint64_t */

            /* If we blocked in the context of a consumer
             * group, we need to resolve the group and
             * consumer here. */
            streamCG *group = NULL;
            streamConsumer *consumer = NULL;
            //如果记录了group信息,那么查找一下这个group和consumer是不是存在
            if (receiver-&gt;bpop.xread_group) {
                group = streamLookupCG(s, receiver-&gt;bpop.xread_group-&gt;ptr);

重点说一下,上面循环所有等待的客户端列表,一个个判断他们是否跟当前key匹配,如果id等都合适,那么调用streamReplyWithRange 去扫描发送消息。
同时,如果有多个consumer等待在一个key上的情况, 怎么处理呢? 不会发生重复吗?
答案当然不会。

因为streamReplyWithRange每次回更新最大的last_id,后下回进来的时候第二个消费者其实不会有什么实际的操作发生。 这就是同组group内消费的时候,之后发生一次消费的原因。但是,这里也能看出来,redis没有kafka的partition方式的核心点,就在于消息是没有分partition的,他们在触发的时候从队列头部取一个客户端后发送出去。
同时,发送给一个客户端后这条消息的ID其实就到了group和consumer的 PEL列表里面了,之后就不会再投递给别人了。怎么实现的呢?后面看streamReplyWithRange 前面部分。

最后调用streamReplyWithRange 将制定范围的消息发送给这个消费者。

            }
            if (group) {
                consumer = streamLookupConsumer(group, receiver-&gt;bpop.xread_consumer-&gt;ptr, 1);
            }

            /* Emit the two elements sub-array consisting of
             * the name of the stream and the data we
             * extracted from it. Wrapped in a single-item
             * array, since we have just one key. */
            addReplyMultiBulkLen(receiver,1);
            addReplyMultiBulkLen(receiver,2);
            addReplyBulk(receiver,rl-&gt;key);

            streamPropInfo pi = {
                rl-&gt;key,
                receiver-&gt;bpop.xread_group
            };
            //调用函数给这个group和consumer 发送指定位置的信息
            streamReplyWithRange(receiver,s,&amp;start,NULL,
                                 receiver-&gt;bpop.xread_count,
                                 0, group, consumer, 0, &amp;pi);

            /* Note that after we unblock the client, &#039;gt&#039;
             * and other receiver-&gt;bpop stuff are no longer
             * valid, so we must do the setup above before
             * this call. */
            //解锁客户端,这样会触发去处理这客户端阻塞期间的其他命令。也会从等待队列等数据中移除
            unblockClient(receiver);
        }
    }

streamReplyWithRange函数给这个group和consumer 发送指定位置的信息, 之后通过调用 unblockClient解锁客户端,这样会触发去处理这客户端阻塞期间的其他命令。也会从等待队列等数据中移除。
这样如果客户端还需要阻塞读取,就需要继续调用XREAD BLOCK timeout 读取。

四、总结

到这里阻塞读取消息队列的方式也讲完了,redis比较巧妙的利用了自己的db数据结构,实现了kafka大量重要的消息队列特性。
包括ACK, group消费, 消息ID支持回溯消费旧消息, 支持多消费者消费同一个队列且互相不重复, 支持多个消费者消费多次同一个队列。
也支持内存保存一部分队列内容。

**期待Redis能继续完善Stream, 改进目前的一些限制,让大家用的更加方便更加爽,能一定程度代替kafka ^.^ 。

后面分个文章讲一下block的原理。

Share
分类: C/C++, Redis 标签: , ,

注意: 评论者允许使用'@user空格'的方式将自己的评论通知另外评论者。例如, ABC是本文的评论者之一,则使用'@ABC '(不包括单引号)将会自动将您的评论发送给ABC。使用'@all ',将会将评论发送给之前所有其它评论者。请务必注意user必须和评论者名相匹配(大小写一致)。