nsqd 源码分析(5)- 消息的订阅流程
这里记录一下消息的消费者订阅流程,配合上面文章写的消息发送流程。不过这里暂时没讲lookupd的过程,后面在详细介绍。
- 消费者使用TCP协议,发送SUB topic channel 命令订阅到某个channel上,记录其client.Channel = channel,通知c.SubEventChan;
- 消费者启动后台协程protocolV2.messagePump订阅c.SubEventChan 并得知channel有订阅消息;
- 开始订阅管道subChannel.memoryMsgChan/backend, 每个客户端都可以订阅到channel的内存或者磁盘队列里面;
- 待生产者调用第四步后,其中一个client会得到消息:msg := <-memoryMsgChan;
- 客户端记录StartInFlightTimeout的发送中消息队列,进行超时处理;
- SendMessage 将消息+msgid发送给消费者;
- 消费者收到msgid后,发送FIN+msgid通知服务器成功投递消息,可以清空消息了;
下面具体分析每个步骤的关键代码,为了简化只摘抄了关键部分的代码,其余去掉了:
消费者流程:
1. 消费者使用TCP协议,发送SUB topic channel 命令订阅到某个channel上,记录其client.Channel = channel,通知c.SubEventChan;
消息订阅方法如下,其实就是记录一下订阅的topic信息,以及通知后台协程去实际订阅管道:
func (p *protocolV2) SUB(client *clientV2, params [][]byte) ([]byte, error) { //获取topic和channel,将本客户的加入其client队列 topic := p.ctx.nsqd.GetTopic(topicName) channel = topic.GetChannel(channelName) channel.AddClient(client.ID, client) //下面这行很重要,设置本client说订阅的channel,这样接下来的SubEventChan就会由当前clienid开启的后台协程来订阅这个channel的消息 。 //而这个channel,就是topic->channel结构,里面能找到对应channel的订阅管道 client.Channel = channel // update message pump //通知后台订阅协程来订阅消息,包括内存管道和磁盘 client.SubEventChan <- channel
2. 消费者启动后台协程protocolV2.messagePump订阅c.SubEventChan 并得知channel有订阅消息;
第一步先获得上面步骤所要订阅的channel:
func (p *protocolV2) messagePump(client *clientV2, startedChan chan bool) { //subEventChan 是客户端有订阅行为的通知channel,订阅一次后会重置为null。 subEventChan := client.SubEventChan for { case subChannel = <-subEventChan: //当客户端发送了SUB操作后,会通过这个管道,塞入我所应该订阅的topic,然后这里就得到了对应应该订阅的channel, //然后就在上面的循环中设置对应的memoryMsgChan 或者backend.ReadChan() 进行监听 //所以nsq是通过前台跟客户端交互协程通知后台消息循环协程,你需要订阅一个新的channel并且关注其事件。 这么来订阅的 // you can't SUB anymore subEventChan = nil
得到channel 后放到subChannel,下回循环的时候设置到memoryMsgChan = subChannel.memoryMsgChan/subChannel.backend.ReadChan() 上面.
3. 开始订阅管道subChannel.memoryMsgChan/backend, 每个客户端都可以订阅到channel的内存或者磁盘队列里面;
协程下次循环订阅之上面的管道:
select { case b := <-backendMsgChan: case msg := <-memoryMsgChan:
4. 待生产者调用第四步后,其中一个client会得到消息:msg := <-memoryMsgChan;
//监听这个channel的请求,如果有消息到来,被触发后发送给客户端 subChannel.StartInFlightTimeout(msg, client.ID, msgTimeout) client.SendingMessage() err = p.SendMessage(client, msg)
5. 客户端记录StartInFlightTimeout的发送中消息队列,进行超时处理;
inflight功能用来保证消息的一次到达,所有发送给客户端,但是没收到FIN确认的消息都放到这里面:
func (c *Channel) StartInFlightTimeout(msg *Message, clientID int64, timeout time.Duration) error { //这个函数会在客户端的protocolV2) messagePump 循环体里面,当某个客户端连接得到内存或者磁盘消息后, //在发送前会设置这个inflight队列,用来记录当前正在发送的消息,如果超时时间到来还没有确认收到, //那么会有queueScanLoop循环去扫描,并且重发这条消息, 最后会调用到processInFlightQueue now := time.Now() //下面改clientID会不会有问题?不会,因为一个消息只可能给一个客户端,topic在发送消息到channel的时候 //第0个channel会复用当初的msg结构,之后的会创建一个新的 msg.clientID = clientID msg.deliveryTS = now msg.pri = now.Add(timeout).UnixNano() err := c.pushInFlightMessage(msg) c.addToInFlightPQ(msg) return nil }
6. SendMessage 将消息+msgid发送给消费者;
这个很简单,go语言对于网络请求包装的非常像同步读写,比C简单太多了,不需要处理任何内存结构,buffer组织等,方便到不行:
func (p *protocolV2) SendMessage(client *clientV2, msg *Message) error { p.ctx.nsqd.logf(LOG_DEBUG, "PROTOCOL(V2): writing msg(%s) to client(%s) - %s", msg.ID, client, msg.Body) var buf = &bytes.Buffer{} _, err := msg.WriteTo(buf) err = p.Send(client, frameTypeMessage, buf.Bytes()) return nil }
7. 消费者收到msgid后,发送FIN+msgid通知服务器成功投递消息,可以清空消息了;
最后,客户端收到消息后,发送FIN+msgid, 通知服务器删除消息:
func (p *protocolV2) FIN(client *clientV2, params [][]byte) ([]byte, error) { //客户端发送FIN 某条消息来FinishMessage结束消息倒计时,成功投递 //结束消息倒计时,投递完成 err = client.Channel.FinishMessage(client.ID, *id)
FinishMessage 函数就是删除上面2.5步设置的inflight倒计时队列:
func (c *Channel) FinishMessage(clientID int64, id MessageID) error { //收到FIN命令结束倒计时,完美投递 msg, err := c.popInFlightMessage(clientID, id) c.removeFromInFlightPQ(msg) return nil }
总结一下:
1. nsqd每个连接上来的客户端会创建一个protocolV2.messagePump协程负责订阅消息,做超时处理;
1. 客户端发送SUB命令后,SUB()函数会通知客户端的messagePump携程去订阅这个channel的消息;
1. messagePump协程收到订阅更新的管道消息后,会等待在Channel.memoryMsgChan和Channel.backend.ReadChan()上;
1. 只要有生产者发送消息后,channel.memoryMsgChan便会有新的消息到来,其中一个客户端就能获得管道的消息;
1. 拿到消息后调用StartInFlightTimeout将消息放到队列,用来做超时重传,然后调用SendMessage发送给客户端 ;
消费者流程基本就是这样了,稍微要注意的就是nsq的消息有defer和超时重传的功能,两个功能分别由StartDeferredTimeout 和StartInFlightTimeout 两个优先级队列来维护,后面对接的其实是Main() 函数创建的queueScanLoop队列scan扫描协程;
近期评论