首页 > GO, nsqd, nsqd > nsqd 源码分析(4)- 消息的发送流程

nsqd 源码分析(4)- 消息的发送流程

2018年5月27日 发表评论 阅读评论 29271次阅读    

本文从消息发送者的PUB到最后消息被SUB接收,整个流程串起来讲一下nsqd是怎么接收消息的,这部分先写发送流程。

  1. 生产者PUB topic消息;
  2. topic.PutMessage 到 topic.memoryMsgChan;
  3. 每topic消息有后台协程topic.messagePump进行Topic.memoryMsgChan监听;
  4. topic.messagePump 遍历每个channel调用channel.PutMessage发送消息到channel.memoryMsgChan ;

生产者流程:

下面具体分析每个步骤的关键代码,为了简化只摘抄了关键部分的代码,其余去掉了:

1 生产者PUB topic消息;

每一个生产者都会单独创建一个IOLoop协程,进行同步(实际上异步)的处理,首先会创建messagePump协程做后台订阅,发送任务。

func (p *protocolV2) IOLoop(conn net.Conn) error {
    //nsqd/tcp.go 的tcpServer.Handle 在读取前面4个字节的版本后调用这里,开始去读取客户端请求然后处理的过程,
    clientID := atomic.AddInt64(&p.ctx.nsqd.clientIDSequence, 1)
    //创建client结构
    client := newClientV2(clientID, conn, p.ctx)
    //上面注释说了,这里做同步的原因在于,需要确保messagePump 里面的初始化完成在进行下面的操作,
    //因为当前客户端在后面可能修改相关的数据。
    //消息的订阅发布工作在辅助的messagePump携程处理,下面创建之
    messagePumpStartedChan := make(chan bool)
    go p.messagePump(client, messagePumpStartedChan)
    <-messagePumpStartedChan
    //开始循环读取客户端请求然后解析参数,进行处理, 这个工作在客户端的主协程处理
    for {
            //执行这条命令
        response, err = p.Exec(client, params)
    }
}

后面的Exec对于PUB命令来说就是下面的函数:

func (p *protocolV2) PUB(client *clientV2, params [][]byte) ([]byte, error) {
    //get一下topic,如果没有会自动创建,并且开启topic的消息循环,开始从lookupd同步消息
    topic := p.ctx.nsqd.GetTopic(topicName)
    msg := NewMessage(topic.GenerateID(), messageBody)
    err = topic.PutMessage(msg)
}

2. topic.PutMessage 到 topic.memoryMsgChan;

topic.PutMessage函数实际上就是topic.put, 后者功能很简单,将消息放入t.memoryMsgChan或者磁盘后就返回了。客户端流程结束

func (t *Topic) put(m *Message) error {
    select {
        //将这条消息直接塞入内存管道
    case t.memoryMsgChan <- m:
    default://如果内存消息管道满了(memoryMsgChan的容量由 getOpts().MemQueueSize设置),那么就放入到后面的持久化存储里面
        b := bufferPoolGet()
        err := writeMessageToBackend(b, m, t.backend)
        bufferPoolPut(b)
        t.ctx.nsqd.SetHealth(err)
        if err != nil {
            t.ctx.nsqd.logf(LOG_ERROR,
                "TOPIC(%s) ERROR: failed to write message to backend - %s",
                t.name, err)
            return err
        }
    }
    return nil
}

3 每topic消息有后台协程topic.messagePump进行Topic.memoryMsgChan监听;

先来看一下每个topic创建的时候,是怎么创建后台协程的;

func NewTopic(topicName string, ctx *context, deleteCallback func(*Topic)) *Topic {
    //初始化一个topic结构,并且设置其backend持久化结构,然后开启消息监听协程messagePump, 通知lookupd
    //异步开启消息循环,这是最重要的异步,开启了一个新的携程处理
    t.waitGroup.Wrap(func() { t.messagePump() })

    //通知lookupd有新的topic产生了
    t.ctx.nsqd.Notify(t)

topic.messagePump启动后干了什么事情呢?主要就是订阅了内存和磁盘队列消息,这样在上面第二步“topic.PutMessage 到 topic.memoryMsgChan”之后,下面的select就会检测到有消息到来:

func (t *Topic) messagePump() {
    if len(chans) > 0 {//topic当前有channel,那接下来监听内存和磁盘channel的消息。
        //这里想到GetTopic后面去获取lookupdHTTPAddrs的原因,其实是为了尽量别丢消息
        memoryMsgChan = t.memoryMsgChan
        backendChan = t.backend.ReadChan()
    }
    for {
        select {
        case msg = <-memoryMsgChan://内存队列
        case buf = <-backendChan:

4 topic.messagePump 遍历每个channel调用channel.PutMessage发送消息到channel.memoryMsgChan ;

接下来好办了,遍历t.channelMap 然后将消息一个个发送到channel的流程里面:

        //到这里只有一种可能,有新消息来了, 那么遍历channel,调用PutMessage发送消息
        for i, channel := range chans {
            chanMsg := msg
            if chanMsg.deferred != 0 {
                //如果是defered延迟投递的消息,那么放入特殊的队列
                channel.PutMessageDeferred(chanMsg, chanMsg.deferred)
                continue
            }
            //立即投递到channel里面
            err := channel.PutMessage(chanMsg)

后面channel发送消息也很简单,PutMessage实际上就是channel.put函数:

func (c *Channel) put(m *Message) error {
    //channel的putmessage,还是老办法,将消息放入channel.memoryMsgChan里面,
    //或者放到后台持久化里面,如果客户端来不及接受的话, 那就存入文件
    //这里客户端是如何接收到消息的呢?可以看SUB命令了. 
    //sub命令最后会调用到client.SubEventChan <- channel, 也就是说,会在这个客户端对应的消息循环里面记录这个channel.memoryMsgChan
    //并且监听他,任何客户端SUB到某个channel后,其消息循环便会订阅到对应这个channel的memoryMsgChan上面,
    //所以同一个channel,客户端随机有一个能收到消息. 这里我们知道,channel的消息发送也是通过管道,                               
    //而管道的另一端,则是所有订阅到这上面的client的消息处理协程
    select {
    case c.memoryMsgChan <- m:
    default:
        b := bufferPoolGet()
        err := writeMessageToBackend(b, m, c.backend)
        bufferPoolPut(b)
        c.ctx.nsqd.SetHealth(err)
    }
    return nil
}

channel.PutMessage发送消息到channel.memoryMsgChan 之后,这个memoryMsgChan的消费端其实是每一个订阅上来的客户端。也就是说,channel后面的客户端互相竞争这个条消息,谁拿到就谁处理,也只能有一个会得到这条消息,具体流程在下一篇文章介绍。


简单总结一下:

1. nsqd每个连接上来的客户端有一个协程IOLoop循环读取消息,还有一个协程负责订阅channel的消息管道;

2. 每一个topic有一个后台协程进行消息的发送和其他处理;

3. 客户端发送消息时是在IOLoop里面把消息放入topic.memoryMsgChan的,之后就由这个topic的messagePump来处理消息;

4. 每个channel没有单独的协程负责后面的处理,channel发送消息其实就只需要把消息放到channel.memoryMsgChan就行;

篇幅太长后面的消费流程下篇文章写;

Share
分类: GO, nsqd, nsqd 标签: , ,
  1. 本文目前尚无任何评论.
  1. 本文目前尚无任何 trackbacks 和 pingbacks.

注意: 评论者允许使用'@user空格'的方式将自己的评论通知另外评论者。例如, ABC是本文的评论者之一,则使用'@ABC '(不包括单引号)将会自动将您的评论发送给ABC。使用'@all ',将会将评论发送给之前所有其它评论者。请务必注意user必须和评论者名相匹配(大小写一致)。