首页 > GO, nsqd, nsqd > nsqd 源码分析(5)- 消息的订阅流程

nsqd 源码分析(5)- 消息的订阅流程

2018年5月27日 发表评论 阅读评论 29511次阅读    

这里记录一下消息的消费者订阅流程,配合上面文章写的消息发送流程。不过这里暂时没讲lookupd的过程,后面在详细介绍。

  1. 消费者使用TCP协议,发送SUB topic channel 命令订阅到某个channel上,记录其client.Channel = channel,通知c.SubEventChan;
  2. 消费者启动后台协程protocolV2.messagePump订阅c.SubEventChan 并得知channel有订阅消息;
  3. 开始订阅管道subChannel.memoryMsgChan/backend, 每个客户端都可以订阅到channel的内存或者磁盘队列里面;
  4. 待生产者调用第四步后,其中一个client会得到消息:msg := <-memoryMsgChan;
  5. 客户端记录StartInFlightTimeout的发送中消息队列,进行超时处理;
  6. SendMessage 将消息+msgid发送给消费者;
  7. 消费者收到msgid后,发送FIN+msgid通知服务器成功投递消息,可以清空消息了;


下面具体分析每个步骤的关键代码,为了简化只摘抄了关键部分的代码,其余去掉了:

消费者流程:


1. 消费者使用TCP协议,发送SUB topic channel 命令订阅到某个channel上,记录其client.Channel = channel,通知c.SubEventChan;

消息订阅方法如下,其实就是记录一下订阅的topic信息,以及通知后台协程去实际订阅管道:

func (p *protocolV2) SUB(client *clientV2, params [][]byte) ([]byte, error) {
       //获取topic和channel,将本客户的加入其client队列
        topic := p.ctx.nsqd.GetTopic(topicName)
        channel = topic.GetChannel(channelName)
        channel.AddClient(client.ID, client)
    //下面这行很重要,设置本client说订阅的channel,这样接下来的SubEventChan就会由当前clienid开启的后台协程来订阅这个channel的消息
。
    //而这个channel,就是topic-&gt;channel结构,里面能找到对应channel的订阅管道
    client.Channel = channel
    // update message pump
    //通知后台订阅协程来订阅消息,包括内存管道和磁盘
    client.SubEventChan &lt;- channel

2. 消费者启动后台协程protocolV2.messagePump订阅c.SubEventChan 并得知channel有订阅消息;

第一步先获得上面步骤所要订阅的channel:

func (p *protocolV2) messagePump(client *clientV2, startedChan chan bool) {
    //subEventChan 是客户端有订阅行为的通知channel,订阅一次后会重置为null。
    subEventChan := client.SubEventChan
   for {
               case subChannel = &lt;-subEventChan:
            //当客户端发送了SUB操作后,会通过这个管道,塞入我所应该订阅的topic,然后这里就得到了对应应该订阅的channel,
            //然后就在上面的循环中设置对应的memoryMsgChan 或者backend.ReadChan() 进行监听                                        
            //所以nsq是通过前台跟客户端交互协程通知后台消息循环协程,你需要订阅一个新的channel并且关注其事件。 这么来订阅的
            // you can&#039;t SUB anymore
            subEventChan = nil

得到channel 后放到subChannel,下回循环的时候设置到memoryMsgChan = subChannel.memoryMsgChan/subChannel.backend.ReadChan() 上面.

3. 开始订阅管道subChannel.memoryMsgChan/backend, 每个客户端都可以订阅到channel的内存或者磁盘队列里面;

协程下次循环订阅之上面的管道:

        select {
        case b := &lt;-backendMsgChan:
        case msg := &lt;-memoryMsgChan:

4. 待生产者调用第四步后,其中一个client会得到消息:msg := <-memoryMsgChan;

            //监听这个channel的请求,如果有消息到来,被触发后发送给客户端
            subChannel.StartInFlightTimeout(msg, client.ID, msgTimeout)
            client.SendingMessage()
            err = p.SendMessage(client, msg) 

5. 客户端记录StartInFlightTimeout的发送中消息队列,进行超时处理;

inflight功能用来保证消息的一次到达,所有发送给客户端,但是没收到FIN确认的消息都放到这里面:

func (c *Channel) StartInFlightTimeout(msg *Message, clientID int64, timeout time.Duration) error {                              
    //这个函数会在客户端的protocolV2) messagePump 循环体里面,当某个客户端连接得到内存或者磁盘消息后,
    //在发送前会设置这个inflight队列,用来记录当前正在发送的消息,如果超时时间到来还没有确认收到,
    //那么会有queueScanLoop循环去扫描,并且重发这条消息, 最后会调用到processInFlightQueue
    now := time.Now() 
    //下面改clientID会不会有问题?不会,因为一个消息只可能给一个客户端,topic在发送消息到channel的时候
    //第0个channel会复用当初的msg结构,之后的会创建一个新的
    msg.clientID = clientID
    msg.deliveryTS = now
    msg.pri = now.Add(timeout).UnixNano()
    err := c.pushInFlightMessage(msg)
    c.addToInFlightPQ(msg)
    return nil
}

6. SendMessage 将消息+msgid发送给消费者;

这个很简单,go语言对于网络请求包装的非常像同步读写,比C简单太多了,不需要处理任何内存结构,buffer组织等,方便到不行:

func (p *protocolV2) SendMessage(client *clientV2, msg *Message) error {
    p.ctx.nsqd.logf(LOG_DEBUG, &quot;PROTOCOL(V2): writing msg(%s) to client(%s) - %s&quot;, msg.ID, client, msg.Body)
    var buf = &amp;bytes.Buffer{}
    _, err := msg.WriteTo(buf)

    err = p.Send(client, frameTypeMessage, buf.Bytes())
    return nil
}

7. 消费者收到msgid后,发送FIN+msgid通知服务器成功投递消息,可以清空消息了;

最后,客户端收到消息后,发送FIN+msgid, 通知服务器删除消息:

func (p *protocolV2) FIN(client *clientV2, params [][]byte) ([]byte, error) {
    //客户端发送FIN 某条消息来FinishMessage结束消息倒计时,成功投递 
    //结束消息倒计时,投递完成
    err = client.Channel.FinishMessage(client.ID, *id)

FinishMessage 函数就是删除上面2.5步设置的inflight倒计时队列:

func (c *Channel) FinishMessage(clientID int64, id MessageID) error {
    //收到FIN命令结束倒计时,完美投递
    msg, err := c.popInFlightMessage(clientID, id)
    c.removeFromInFlightPQ(msg)
    return nil
}

总结一下:

1. nsqd每个连接上来的客户端会创建一个protocolV2.messagePump协程负责订阅消息,做超时处理;

1. 客户端发送SUB命令后,SUB()函数会通知客户端的messagePump携程去订阅这个channel的消息;

1. messagePump协程收到订阅更新的管道消息后,会等待在Channel.memoryMsgChan和Channel.backend.ReadChan()上;

1. 只要有生产者发送消息后,channel.memoryMsgChan便会有新的消息到来,其中一个客户端就能获得管道的消息;

1. 拿到消息后调用StartInFlightTimeout将消息放到队列,用来做超时重传,然后调用SendMessage发送给客户端 ;

消费者流程基本就是这样了,稍微要注意的就是nsq的消息有defer和超时重传的功能,两个功能分别由StartDeferredTimeout 和StartInFlightTimeout 两个优先级队列来维护,后面对接的其实是Main() 函数创建的queueScanLoop队列scan扫描协程;

Share
分类: GO, nsqd, nsqd 标签: , ,
  1. 本文目前尚无任何评论.
  1. 本文目前尚无任何 trackbacks 和 pingbacks.

注意: 评论者允许使用'@user空格'的方式将自己的评论通知另外评论者。例如, ABC是本文的评论者之一,则使用'@ABC '(不包括单引号)将会自动将您的评论发送给ABC。使用'@all ',将会将评论发送给之前所有其它评论者。请务必注意user必须和评论者名相匹配(大小写一致)。