nsqd 源码分析(4)- 消息的发送流程
本文从消息发送者的PUB到最后消息被SUB接收,整个流程串起来讲一下nsqd是怎么接收消息的,这部分先写发送流程。
- 生产者PUB topic消息;
- topic.PutMessage 到 topic.memoryMsgChan;
- 每topic消息有后台协程topic.messagePump进行Topic.memoryMsgChan监听;
- topic.messagePump 遍历每个channel调用channel.PutMessage发送消息到channel.memoryMsgChan ;
生产者流程:
下面具体分析每个步骤的关键代码,为了简化只摘抄了关键部分的代码,其余去掉了:
1 生产者PUB topic消息;
每一个生产者都会单独创建一个IOLoop协程,进行同步(实际上异步)的处理,首先会创建messagePump协程做后台订阅,发送任务。
func (p *protocolV2) IOLoop(conn net.Conn) error { //nsqd/tcp.go 的tcpServer.Handle 在读取前面4个字节的版本后调用这里,开始去读取客户端请求然后处理的过程, clientID := atomic.AddInt64(&p.ctx.nsqd.clientIDSequence, 1) //创建client结构 client := newClientV2(clientID, conn, p.ctx) //上面注释说了,这里做同步的原因在于,需要确保messagePump 里面的初始化完成在进行下面的操作, //因为当前客户端在后面可能修改相关的数据。 //消息的订阅发布工作在辅助的messagePump携程处理,下面创建之 messagePumpStartedChan := make(chan bool) go p.messagePump(client, messagePumpStartedChan) <-messagePumpStartedChan //开始循环读取客户端请求然后解析参数,进行处理, 这个工作在客户端的主协程处理 for { //执行这条命令 response, err = p.Exec(client, params) } }
后面的Exec对于PUB命令来说就是下面的函数:
func (p *protocolV2) PUB(client *clientV2, params [][]byte) ([]byte, error) { //get一下topic,如果没有会自动创建,并且开启topic的消息循环,开始从lookupd同步消息 topic := p.ctx.nsqd.GetTopic(topicName) msg := NewMessage(topic.GenerateID(), messageBody) err = topic.PutMessage(msg) }
2. topic.PutMessage 到 topic.memoryMsgChan;
topic.PutMessage函数实际上就是topic.put, 后者功能很简单,将消息放入t.memoryMsgChan或者磁盘后就返回了。客户端流程结束
func (t *Topic) put(m *Message) error { select { //将这条消息直接塞入内存管道 case t.memoryMsgChan <- m: default://如果内存消息管道满了(memoryMsgChan的容量由 getOpts().MemQueueSize设置),那么就放入到后面的持久化存储里面 b := bufferPoolGet() err := writeMessageToBackend(b, m, t.backend) bufferPoolPut(b) t.ctx.nsqd.SetHealth(err) if err != nil { t.ctx.nsqd.logf(LOG_ERROR, "TOPIC(%s) ERROR: failed to write message to backend - %s", t.name, err) return err } } return nil }
3 每topic消息有后台协程topic.messagePump进行Topic.memoryMsgChan监听;
先来看一下每个topic创建的时候,是怎么创建后台协程的;
func NewTopic(topicName string, ctx *context, deleteCallback func(*Topic)) *Topic { //初始化一个topic结构,并且设置其backend持久化结构,然后开启消息监听协程messagePump, 通知lookupd //异步开启消息循环,这是最重要的异步,开启了一个新的携程处理 t.waitGroup.Wrap(func() { t.messagePump() }) //通知lookupd有新的topic产生了 t.ctx.nsqd.Notify(t)
topic.messagePump启动后干了什么事情呢?主要就是订阅了内存和磁盘队列消息,这样在上面第二步“topic.PutMessage 到 topic.memoryMsgChan”之后,下面的select就会检测到有消息到来:
func (t *Topic) messagePump() { if len(chans) > 0 {//topic当前有channel,那接下来监听内存和磁盘channel的消息。 //这里想到GetTopic后面去获取lookupdHTTPAddrs的原因,其实是为了尽量别丢消息 memoryMsgChan = t.memoryMsgChan backendChan = t.backend.ReadChan() } for { select { case msg = <-memoryMsgChan://内存队列 case buf = <-backendChan:
4 topic.messagePump 遍历每个channel调用channel.PutMessage发送消息到channel.memoryMsgChan ;
接下来好办了,遍历t.channelMap 然后将消息一个个发送到channel的流程里面:
//到这里只有一种可能,有新消息来了, 那么遍历channel,调用PutMessage发送消息 for i, channel := range chans { chanMsg := msg if chanMsg.deferred != 0 { //如果是defered延迟投递的消息,那么放入特殊的队列 channel.PutMessageDeferred(chanMsg, chanMsg.deferred) continue } //立即投递到channel里面 err := channel.PutMessage(chanMsg)
后面channel发送消息也很简单,PutMessage实际上就是channel.put函数:
func (c *Channel) put(m *Message) error { //channel的putmessage,还是老办法,将消息放入channel.memoryMsgChan里面, //或者放到后台持久化里面,如果客户端来不及接受的话, 那就存入文件 //这里客户端是如何接收到消息的呢?可以看SUB命令了. //sub命令最后会调用到client.SubEventChan <- channel, 也就是说,会在这个客户端对应的消息循环里面记录这个channel.memoryMsgChan //并且监听他,任何客户端SUB到某个channel后,其消息循环便会订阅到对应这个channel的memoryMsgChan上面, //所以同一个channel,客户端随机有一个能收到消息. 这里我们知道,channel的消息发送也是通过管道, //而管道的另一端,则是所有订阅到这上面的client的消息处理协程 select { case c.memoryMsgChan <- m: default: b := bufferPoolGet() err := writeMessageToBackend(b, m, c.backend) bufferPoolPut(b) c.ctx.nsqd.SetHealth(err) } return nil }
channel.PutMessage发送消息到channel.memoryMsgChan 之后,这个memoryMsgChan的消费端其实是每一个订阅上来的客户端。也就是说,channel后面的客户端互相竞争这个条消息,谁拿到就谁处理,也只能有一个会得到这条消息,具体流程在下一篇文章介绍。
简单总结一下:
1. nsqd每个连接上来的客户端有一个协程IOLoop循环读取消息,还有一个协程负责订阅channel的消息管道;
2. 每一个topic有一个后台协程进行消息的发送和其他处理;
3. 客户端发送消息时是在IOLoop里面把消息放入topic.memoryMsgChan的,之后就由这个topic的messagePump来处理消息;
4. 每个channel没有单独的协程负责后面的处理,channel发送消息其实就只需要把消息放到channel.memoryMsgChan就行;
篇幅太长后面的消费流程下篇文章写;
近期评论