Redis 5.0 Stream消息队列会每次唤醒所有同一个group内的消费者
Redis stream支持多个消费者在同一个group里面消费,并且一条消息只会发送给一个消费者,不会被同组内的多个consumer获取。但有些特殊情况也不得不注意。
而当所有消费者都BLOCK在一个组内的时候,如果生产者生产一条消息,正常对于kafka来说不会唤醒其他消费者的,而redis目前的实现在这里,会将所有消费者都唤醒,只是只有第一个订阅者才能获取到这条消息,这点在使用的时候需要格外注意,性能也有问题,希望以后redis能改善这情况了。
下面详细说一下出现这种情况的原因。
0. handleClientsBlockedOnKeys 扫描订阅的消费者列表尝试发送消息
在之前的文章 Redis 5.0 重量级特性 Stream 实现源码分析(二)XREAD 消费流程 里面说过,所有消费者阻塞在一个队列上后,如果有新元素插入,就会触发到handleClientsBlockedOnKeys()函数。 函数会遍历所有等待在这个key上面的客户端列表,按照先后顺序看是否要给他发送消息。
list *clients = dictGetVal(de);
listNode *ln;
listIter li;
listRewind(clients,&li);
//下面循环所有等待的客户端列表,一个个判断他们是否跟当前key匹配,如果id等都合适,
//那么调用streamReplyWithRange 去扫描发送消息。同时,如果有多个consumer等待在一个key上的情况
//怎么处理呢? 不会发生重复吗? 答案当然不会,因为streamReplyWithRange每次回更新最大的last_id,
//然后下回进来的时候第二个消费者其实不会有什么实际的操作发生
while((ln = listNext(&li))) {
client *receiver = listNodeValue(ln);
//一个客户端可能block在多个key上? 是的
//每个客户端的bpop结构里面记录了我都阻塞在了哪些key里面
//但是,同时只能等待在某一类key上面,不能是多种。 因为blockClient 里面会重置btype, 且等待状态会拒绝其他请求
if (receiver->btype != BLOCKED_STREAM) continue;
//从这个客户端的等待的key列表里面找到这个key,然后比较id是否有新的,如果有新的就发送数据并且解开等待状态
streamID *gt = dictFetchValue(receiver->bpop.keys, rl->key);
if (s->last_id.ms > gt->ms ||
(s->last_id.ms == gt->ms && s->last_id.seq > gt->seq))
{
//、、、
//调用函数给这个group和consumer 发送指定位置的信息
streamReplyWithRange(receiver,s,&start,NULL,
receiver->bpop.xread_count,
0, group, consumer, 0, &pi);
}
每次循环的最后,就是调用streamReplyWithRange 函数发送该消费者指定的起始位置往后的消息。这里就得问了,那同组不重复的逻辑在哪实现的呢?
答案是在streamReplyWithRange里面,给客户端找消息之前,会判断当前客户端要求的id位置是不是比所属group的最大id要小,如果要小,就不会将后面的新消息发送给这个消费者。
1. streamReplyWithRange 扫描新消息,去重
来看一下streamReplyWithRange前面部分,对于如果指定了group消费, 参数ID如果小于这个群组的最大id的时候,是什么情况呢?
答案是要么这个客户端要求消费旧消息,要么是因为同时等待,然后第一个消费了后,后面的消费者其实都遇到这种情况。
size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end, size_t count, int rev, streamCG *group, streamC
onsumer *consumer, int flags, streamPropInfo *spi) {
//根据group和consumer参数,读取start到end的最多count个元素,可以反向读取
void *arraylen_ptr = NULL;
if (group && streamCompareID(start,&group->last_id) <= 0) {
//这是个旧id,那么从待完成队列里面去找一下消息然后触发他们,不能发错了
return streamReplyWithRangeFromConsumerPEL代码。
2. streamReplyWithRangeFromConsumerPEL 发送PEL旧消息
来看一下 streamReplyWithRangeFromConsumerPEL代码, 注意下面的注释。
/* This is an helper function for streamReplyWithRange() when called with
* group and consumer arguments, but with a range that is referring to already
* delivered messages. In this case we just emit messages that are already
* in the history of the conusmer, fetching the IDs from its PEL.
*
* Note that this function does not have a 'rev' argument because it's not
* possible to iterate in reverse using a group. Basically this function
* is only called as a result of the XREADGROUP command.
*
* This function is more expensive because it needs to inspect the PEL and then
* seek into the radix tree of the messages in order to emit the full message
* to the client. However clients only reach this code path when they are
* fetching the history of already retrieved messages, which is rare. */
//上面作者说这个函数很少进来,只有客户端获取旧id消息的时候调用。
//但是还有一种情况,多个消费者等待同一个id的时候,除了第一个之外其他都会重复投递吧
//然后这个函数进行重试的时候,就会将所有客户端全唤醒,返回空
size_t streamReplyWithRangeFromConsumerPEL(client *c, stream *s, streamID *start, streamID *end, size_t count, streamConsumer *consumer) {
//streamReplyWithRange 调用这里,由于客户端要求的ID比group的最大id小
//为了避免一个ID发给多个人,所以这种时候调用这里来将这个consumer的待确认队列里面的内容全部发送出去
//什么意思呢?就当他要求重发他的历史数据了。
raxIterator ri;
unsigned char startkey[sizeof(streamID)];
unsigned char endkey[sizeof(streamID)];
streamEncodeID(startkey,start);
if (end) streamEncodeID(endkey,end);
size_t arraylen = 0;
void *arraylen_ptr = addDeferredMultiBulkLength(c);
raxStart(&ri,consumer->pel);
raxSeek(&ri,">=",startkey,sizeof(startkey));
while(raxNext(&ri) && (!count || arraylen < count)) {
if (end && memcmp(ri.key,end,ri.key_len) > 0) break;
streamID thisid;
streamDecodeID(ri.key,&thisid);
if (streamReplyWithRange(c,s,&thisid,NULL,1,0,NULL,NULL,
STREAM_RWR_RAWENTRIES,NULL) == 0)
{
/* Note that we may have a not acknowledged entry in the PEL
* about a message that's no longer here because was removed
* by the user by other means. In that case we signal it emitting
* the ID but then a NULL entry for the fields. */
addReplyMultiBulkLen(c,2);
streamID id;
streamDecodeID(ri.key,&id);
addReplyStreamID(c,&id);
addReply(c,shared.nullmultibulk);
} else {
streamNACK *nack = ri.data;
nack->delivery_time = mstime();
nack->delivery_count++;
}
arraylen++;
}
raxStop(&ri);
setDeferredMultiBulkLength(c,arraylen_ptr,arraylen);
return arraylen;
}
streamReplyWithRange 调用streamReplyWithRangeFromConsumerPEL,由于客户端要求的ID比group的最大id小, 为了避免一个ID发给多个人,所以这种时候调用这里来将这个consumer的待确认队列里面的内容全部发送出去。
什么意思呢?就当他要求重发他的历史数据了!
看上面的英文注释,上面作者说这个函数很少进来,只有消费者获取旧id消息的时候调用。但是还有一种情况,多个消费者等待同一个id的时候,除了第一个之外其他都会重复投递吧? 然后这个函数进行重试的时候,就会将所有消费者全唤醒,后面的消费者返回空!
注意上面raxSeek 是从客户端要求的startid开始,这样 多消费者在group内消费的时候,id会不断加大,所以不可能存在PEL消息,比消费者要求的消息还小。也就是说,下面的while进不去。
但是呢,addDeferredMultiBulkLength 已经调用了,会导致给客户端发送空消息的,这样客户端就收到一个空数据包被唤醒,又得继续BLOCK 才行。适用方会很郁闷,来生产一条消息所有消费者都唤醒~
这种情况其实挺耗性能的。
3. 复现测试样例
再举个例子:
C1, C2 消费者消费同一个stream, 在一个group内。 按照如下顺序处理。
| C1 | C2 | P1 |
|---|---|---|
| XREADGROUP GROUP mygroup C1 count 10 block 10000000 streams streamname1 > | ||
| XREADGROUP GROUP mygroup C2 count 10 block 10000000 streams streamname1 > | ||
| 阻塞 | 阻塞 | |
| XADD streamname1 * testf1 v1 | ||
| 唤醒,获得消息testf1 | 唤醒,获得空消息 | |
| C1需要重新XREADGROUP | C2也需要重新XREADGROUP |
由于目前这种机制存在,类似于惊群效应,所以其实对网络的消耗,以及消费者频繁被唤醒后还需要再次发送 XREADGROUP BLOCK 去等待消息,性能消耗可能是需要考虑的问题。使用的时候得注意。

近期评论