Redis 5.0 Stream消息队列会每次唤醒所有同一个group内的消费者
Redis stream支持多个消费者在同一个group里面消费,并且一条消息只会发送给一个消费者,不会被同组内的多个consumer获取。但有些特殊情况也不得不注意。
而当所有消费者都BLOCK在一个组内的时候,如果生产者生产一条消息,正常对于kafka来说不会唤醒其他消费者的,而redis目前的实现在这里,会将所有消费者都唤醒,只是只有第一个订阅者才能获取到这条消息,这点在使用的时候需要格外注意,性能也有问题,希望以后redis能改善这情况了。
下面详细说一下出现这种情况的原因。
0. handleClientsBlockedOnKeys 扫描订阅的消费者列表尝试发送消息
在之前的文章 Redis 5.0 重量级特性 Stream 实现源码分析(二)XREAD 消费流程 里面说过,所有消费者阻塞在一个队列上后,如果有新元素插入,就会触发到handleClientsBlockedOnKeys()函数。 函数会遍历所有等待在这个key上面的客户端列表,按照先后顺序看是否要给他发送消息。
list *clients = dictGetVal(de); listNode *ln; listIter li; listRewind(clients,&li); //下面循环所有等待的客户端列表,一个个判断他们是否跟当前key匹配,如果id等都合适, //那么调用streamReplyWithRange 去扫描发送消息。同时,如果有多个consumer等待在一个key上的情况 //怎么处理呢? 不会发生重复吗? 答案当然不会,因为streamReplyWithRange每次回更新最大的last_id, //然后下回进来的时候第二个消费者其实不会有什么实际的操作发生 while((ln = listNext(&li))) { client *receiver = listNodeValue(ln); //一个客户端可能block在多个key上? 是的 //每个客户端的bpop结构里面记录了我都阻塞在了哪些key里面 //但是,同时只能等待在某一类key上面,不能是多种。 因为blockClient 里面会重置btype, 且等待状态会拒绝其他请求 if (receiver->btype != BLOCKED_STREAM) continue; //从这个客户端的等待的key列表里面找到这个key,然后比较id是否有新的,如果有新的就发送数据并且解开等待状态 streamID *gt = dictFetchValue(receiver->bpop.keys, rl->key); if (s->last_id.ms > gt->ms || (s->last_id.ms == gt->ms && s->last_id.seq > gt->seq)) { //、、、 //调用函数给这个group和consumer 发送指定位置的信息 streamReplyWithRange(receiver,s,&start,NULL, receiver->bpop.xread_count, 0, group, consumer, 0, &pi); }
每次循环的最后,就是调用streamReplyWithRange 函数发送该消费者指定的起始位置往后的消息。这里就得问了,那同组不重复的逻辑在哪实现的呢?
答案是在streamReplyWithRange里面,给客户端找消息之前,会判断当前客户端要求的id位置是不是比所属group的最大id要小,如果要小,就不会将后面的新消息发送给这个消费者。
1. streamReplyWithRange 扫描新消息,去重
来看一下streamReplyWithRange前面部分,对于如果指定了group消费, 参数ID如果小于这个群组的最大id的时候,是什么情况呢?
答案是要么这个客户端要求消费旧消息,要么是因为同时等待,然后第一个消费了后,后面的消费者其实都遇到这种情况。
size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end, size_t count, int rev, streamCG *group, streamC onsumer *consumer, int flags, streamPropInfo *spi) { //根据group和consumer参数,读取start到end的最多count个元素,可以反向读取 void *arraylen_ptr = NULL; if (group && streamCompareID(start,&group->last_id) <= 0) { //这是个旧id,那么从待完成队列里面去找一下消息然后触发他们,不能发错了 return streamReplyWithRangeFromConsumerPEL代码。
2. streamReplyWithRangeFromConsumerPEL 发送PEL旧消息
来看一下 streamReplyWithRangeFromConsumerPEL代码, 注意下面的注释。
/* This is an helper function for streamReplyWithRange() when called with * group and consumer arguments, but with a range that is referring to already * delivered messages. In this case we just emit messages that are already * in the history of the conusmer, fetching the IDs from its PEL. * * Note that this function does not have a 'rev' argument because it's not * possible to iterate in reverse using a group. Basically this function * is only called as a result of the XREADGROUP command. * * This function is more expensive because it needs to inspect the PEL and then * seek into the radix tree of the messages in order to emit the full message * to the client. However clients only reach this code path when they are * fetching the history of already retrieved messages, which is rare. */ //上面作者说这个函数很少进来,只有客户端获取旧id消息的时候调用。 //但是还有一种情况,多个消费者等待同一个id的时候,除了第一个之外其他都会重复投递吧 //然后这个函数进行重试的时候,就会将所有客户端全唤醒,返回空 size_t streamReplyWithRangeFromConsumerPEL(client *c, stream *s, streamID *start, streamID *end, size_t count, streamConsumer *consumer) { //streamReplyWithRange 调用这里,由于客户端要求的ID比group的最大id小 //为了避免一个ID发给多个人,所以这种时候调用这里来将这个consumer的待确认队列里面的内容全部发送出去 //什么意思呢?就当他要求重发他的历史数据了。 raxIterator ri; unsigned char startkey[sizeof(streamID)]; unsigned char endkey[sizeof(streamID)]; streamEncodeID(startkey,start); if (end) streamEncodeID(endkey,end); size_t arraylen = 0; void *arraylen_ptr = addDeferredMultiBulkLength(c); raxStart(&ri,consumer->pel); raxSeek(&ri,">=",startkey,sizeof(startkey)); while(raxNext(&ri) && (!count || arraylen < count)) { if (end && memcmp(ri.key,end,ri.key_len) > 0) break; streamID thisid; streamDecodeID(ri.key,&thisid); if (streamReplyWithRange(c,s,&thisid,NULL,1,0,NULL,NULL, STREAM_RWR_RAWENTRIES,NULL) == 0) { /* Note that we may have a not acknowledged entry in the PEL * about a message that's no longer here because was removed * by the user by other means. In that case we signal it emitting * the ID but then a NULL entry for the fields. */ addReplyMultiBulkLen(c,2); streamID id; streamDecodeID(ri.key,&id); addReplyStreamID(c,&id); addReply(c,shared.nullmultibulk); } else { streamNACK *nack = ri.data; nack->delivery_time = mstime(); nack->delivery_count++; } arraylen++; } raxStop(&ri); setDeferredMultiBulkLength(c,arraylen_ptr,arraylen); return arraylen; }
streamReplyWithRange 调用streamReplyWithRangeFromConsumerPEL,由于客户端要求的ID比group的最大id小, 为了避免一个ID发给多个人,所以这种时候调用这里来将这个consumer的待确认队列里面的内容全部发送出去。
什么意思呢?就当他要求重发他的历史数据了!
看上面的英文注释,上面作者说这个函数很少进来,只有消费者获取旧id消息的时候调用。但是还有一种情况,多个消费者等待同一个id的时候,除了第一个之外其他都会重复投递吧? 然后这个函数进行重试的时候,就会将所有消费者全唤醒,后面的消费者返回空!
注意上面raxSeek 是从客户端要求的startid开始,这样 多消费者在group内消费的时候,id会不断加大,所以不可能存在PEL消息,比消费者要求的消息还小。也就是说,下面的while进不去。
但是呢,addDeferredMultiBulkLength 已经调用了,会导致给客户端发送空消息的,这样客户端就收到一个空数据包被唤醒,又得继续BLOCK 才行。适用方会很郁闷,来生产一条消息所有消费者都唤醒~
这种情况其实挺耗性能的。
3. 复现测试样例
再举个例子:
C1, C2 消费者消费同一个stream, 在一个group内。 按照如下顺序处理。
C1 | C2 | P1 |
---|---|---|
XREADGROUP GROUP mygroup C1 count 10 block 10000000 streams streamname1 > | ||
XREADGROUP GROUP mygroup C2 count 10 block 10000000 streams streamname1 > | ||
阻塞 | 阻塞 | |
XADD streamname1 * testf1 v1 | ||
唤醒,获得消息testf1 | 唤醒,获得空消息 | |
C1需要重新XREADGROUP | C2也需要重新XREADGROUP |
由于目前这种机制存在,类似于惊群效应,所以其实对网络的消耗,以及消费者频繁被唤醒后还需要再次发送 XREADGROUP BLOCK 去等待消息,性能消耗可能是需要考虑的问题。使用的时候得注意。
近期评论