首页 > Redis > Redis 5.0 Stream消息队列会每次唤醒所有同一个group内的消费者

Redis 5.0 Stream消息队列会每次唤醒所有同一个group内的消费者

2018年8月5日 发表评论 阅读评论 33674次阅读    

Redis stream支持多个消费者在同一个group里面消费,并且一条消息只会发送给一个消费者,不会被同组内的多个consumer获取。但有些特殊情况也不得不注意。

而当所有消费者都BLOCK在一个组内的时候,如果生产者生产一条消息,正常对于kafka来说不会唤醒其他消费者的,而redis目前的实现在这里,会将所有消费者都唤醒,只是只有第一个订阅者才能获取到这条消息,这点在使用的时候需要格外注意,性能也有问题,希望以后redis能改善这情况了。
下面详细说一下出现这种情况的原因。

0. handleClientsBlockedOnKeys 扫描订阅的消费者列表尝试发送消息

在之前的文章 Redis 5.0 重量级特性 Stream 实现源码分析(二)XREAD 消费流程 里面说过,所有消费者阻塞在一个队列上后,如果有新元素插入,就会触发到handleClientsBlockedOnKeys()函数。 函数会遍历所有等待在这个key上面的客户端列表,按照先后顺序看是否要给他发送消息。

    list *clients = dictGetVal(de);
    listNode *ln;
    listIter li;
    listRewind(clients,&li);

    //下面循环所有等待的客户端列表,一个个判断他们是否跟当前key匹配,如果id等都合适,
    //那么调用streamReplyWithRange 去扫描发送消息。同时,如果有多个consumer等待在一个key上的情况
    //怎么处理呢? 不会发生重复吗? 答案当然不会,因为streamReplyWithRange每次回更新最大的last_id,
    //然后下回进来的时候第二个消费者其实不会有什么实际的操作发生
    while((ln = listNext(&li))) {
        client *receiver = listNodeValue(ln);
        //一个客户端可能block在多个key上? 是的
        //每个客户端的bpop结构里面记录了我都阻塞在了哪些key里面
        //但是,同时只能等待在某一类key上面,不能是多种。 因为blockClient 里面会重置btype, 且等待状态会拒绝其他请求
        if (receiver->btype != BLOCKED_STREAM) continue;
        //从这个客户端的等待的key列表里面找到这个key,然后比较id是否有新的,如果有新的就发送数据并且解开等待状态
        streamID *gt = dictFetchValue(receiver->bpop.keys, rl->key);
        if (s->last_id.ms > gt->ms ||
            (s->last_id.ms == gt->ms && s->last_id.seq > gt->seq))
        {
            //、、、
                        //调用函数给这个group和consumer 发送指定位置的信息
            streamReplyWithRange(receiver,s,&start,NULL,
                                 receiver->bpop.xread_count,
                                 0, group, consumer, 0, &pi);

        }

每次循环的最后,就是调用streamReplyWithRange 函数发送该消费者指定的起始位置往后的消息。这里就得问了,那同组不重复的逻辑在哪实现的呢?
答案是在streamReplyWithRange里面,给客户端找消息之前,会判断当前客户端要求的id位置是不是比所属group的最大id要小,如果要小,就不会将后面的新消息发送给这个消费者。

1. streamReplyWithRange 扫描新消息,去重

来看一下streamReplyWithRange前面部分,对于如果指定了group消费, 参数ID如果小于这个群组的最大id的时候,是什么情况呢?
答案是要么这个客户端要求消费旧消息,要么是因为同时等待,然后第一个消费了后,后面的消费者其实都遇到这种情况。

size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end, size_t count, int rev, streamCG *group, streamC
onsumer *consumer, int flags, streamPropInfo *spi) {
    //根据group和consumer参数,读取start到end的最多count个元素,可以反向读取
    void *arraylen_ptr = NULL;
    if (group && streamCompareID(start,&group->last_id) <= 0) {
        //这是个旧id,那么从待完成队列里面去找一下消息然后触发他们,不能发错了
        return streamReplyWithRangeFromConsumerPEL代码。
2. streamReplyWithRangeFromConsumerPEL 发送PEL旧消息

来看一下 streamReplyWithRangeFromConsumerPEL代码, 注意下面的注释。

/* This is an helper function for streamReplyWithRange() when called with
 * group and consumer arguments, but with a range that is referring to already
 * delivered messages. In this case we just emit messages that are already
 * in the history of the conusmer, fetching the IDs from its PEL.
 *
 * Note that this function does not have a 'rev' argument because it's not
 * possible to iterate in reverse using a group. Basically this function
 * is only called as a result of the XREADGROUP command.
 *
 * This function is more expensive because it needs to inspect the PEL and then
 * seek into the radix tree of the messages in order to emit the full message
 * to the client. However clients only reach this code path when they are
 * fetching the history of already retrieved messages, which is rare. */
//上面作者说这个函数很少进来,只有客户端获取旧id消息的时候调用。
//但是还有一种情况,多个消费者等待同一个id的时候,除了第一个之外其他都会重复投递吧
//然后这个函数进行重试的时候,就会将所有客户端全唤醒,返回空
size_t streamReplyWithRangeFromConsumerPEL(client *c, stream *s, streamID *start, streamID *end, size_t count, streamConsumer *consumer) {
    //streamReplyWithRange 调用这里,由于客户端要求的ID比group的最大id小
    //为了避免一个ID发给多个人,所以这种时候调用这里来将这个consumer的待确认队列里面的内容全部发送出去
    //什么意思呢?就当他要求重发他的历史数据了。
    raxIterator ri;
    unsigned char startkey[sizeof(streamID)];
    unsigned char endkey[sizeof(streamID)];
    streamEncodeID(startkey,start);
    if (end) streamEncodeID(endkey,end);

    size_t arraylen = 0;
    void *arraylen_ptr = addDeferredMultiBulkLength(c);
    raxStart(&ri,consumer->pel);
    raxSeek(&ri,">=",startkey,sizeof(startkey));
    while(raxNext(&ri) && (!count || arraylen < count)) {
        if (end && memcmp(ri.key,end,ri.key_len) > 0) break;
        streamID thisid;
        streamDecodeID(ri.key,&thisid);
        if (streamReplyWithRange(c,s,&thisid,NULL,1,0,NULL,NULL,
                                 STREAM_RWR_RAWENTRIES,NULL) == 0)
        {
            /* Note that we may have a not acknowledged entry in the PEL
             * about a message that's no longer here because was removed
             * by the user by other means. In that case we signal it emitting
             * the ID but then a NULL entry for the fields. */
            addReplyMultiBulkLen(c,2);
            streamID id;
            streamDecodeID(ri.key,&id);
            addReplyStreamID(c,&id);
            addReply(c,shared.nullmultibulk);
        } else {
            streamNACK *nack = ri.data;
            nack->delivery_time = mstime();
            nack->delivery_count++;
        }
        arraylen++;
    }
    raxStop(&ri);
    setDeferredMultiBulkLength(c,arraylen_ptr,arraylen);
    return arraylen;
}

streamReplyWithRange 调用streamReplyWithRangeFromConsumerPEL,由于客户端要求的ID比group的最大id小, 为了避免一个ID发给多个人,所以这种时候调用这里来将这个consumer的待确认队列里面的内容全部发送出去。

什么意思呢?就当他要求重发他的历史数据了!

看上面的英文注释,上面作者说这个函数很少进来,只有消费者获取旧id消息的时候调用。但是还有一种情况,多个消费者等待同一个id的时候,除了第一个之外其他都会重复投递吧? 然后这个函数进行重试的时候,就会将所有消费者全唤醒,后面的消费者返回空!

注意上面raxSeek 是从客户端要求的startid开始,这样 多消费者在group内消费的时候,id会不断加大,所以不可能存在PEL消息,比消费者要求的消息还小。也就是说,下面的while进不去。
但是呢,addDeferredMultiBulkLength 已经调用了,会导致给客户端发送空消息的,这样客户端就收到一个空数据包被唤醒,又得继续BLOCK 才行。适用方会很郁闷,来生产一条消息所有消费者都唤醒~

这种情况其实挺耗性能的。

3. 复现测试样例

再举个例子:
C1, C2 消费者消费同一个stream, 在一个group内。 按照如下顺序处理。

C1 C2 P1
XREADGROUP GROUP mygroup C1 count 10 block 10000000 streams streamname1 >
XREADGROUP GROUP mygroup C2 count 10 block 10000000 streams streamname1 >
阻塞 阻塞
XADD streamname1 * testf1 v1
唤醒,获得消息testf1 唤醒,获得空消息
C1需要重新XREADGROUP C2也需要重新XREADGROUP

由于目前这种机制存在,类似于惊群效应,所以其实对网络的消耗,以及消费者频繁被唤醒后还需要再次发送 XREADGROUP BLOCK 去等待消息,性能消耗可能是需要考虑的问题。使用的时候得注意。

Share
分类: Redis 标签: , ,
  1. 本文目前尚无任何评论.
  1. 本文目前尚无任何 trackbacks 和 pingbacks.

注意: 评论者允许使用'@user空格'的方式将自己的评论通知另外评论者。例如, ABC是本文的评论者之一,则使用'@ABC '(不包括单引号)将会自动将您的评论发送给ABC。使用'@all ',将会将评论发送给之前所有其它评论者。请务必注意user必须和评论者名相匹配(大小写一致)。