nsqd 源码分析(3)- Channel实现原理
写一下Channel的实现,比topic相对简单一些,但channel是最接近消费者端的,有他特有的东西,包括投递,确认等;
nsqd的channel的作用在于能对指定的队列topic,进行多份投递,或者说消费,一份队列可以给多个消费者重复消费,有点类似于kafka的consumer_group,每个consumer_group拥有独立的position, 不同group之间可以消费同一份内容的多个副本。
这里稍微做个对比,kafka的已经消费国的消息是可以继续保留的,而nsq则如果消费完了,就会删除掉,也就是没有position可以供回溯,或者重复消费,只要客户端发发送FIN后,消息就会销毁,也不会保留在磁盘上面,这点比kafka简单多了,当然,轻量级消息队列也不太需要这么复杂的功能。
下面分几个方面讲一下channel的实现,代码相对简单,可能复杂的地方在于怎么跟topic以及client协同了。
零、channel结构体
先来看第一部分,基本的数据结构,channel结构保存了接收到的消息数,超时数等一些统计数据,用处不是太大。其次保存了所属的topicname,channel name,以及顶层的context,其实就是NSQD结构。
type Channel struct { // 64bit atomic vars need to be first for proper alignment on 32bit platforms requeueCount uint64 messageCount uint64 timeoutCount uint64 topicName string name string ctx *context backend BackendQueue //磁盘持久化存储 memoryMsgChan chan *Message //channel的消息管道,topic发送消息会放入这里面,所有SUB的客户端会用后台协程订阅到这个管道后面, 1: n exitFlag int32 exitMutex sync.RWMutex clients map[int64]Consumer //所有订阅的topic都会记录到这里, 用来在关闭的时候清理client,以及根据clientid找client paused int32 ephemeral bool deleteCallback func(*Channel) //实际上就是DeleteExistingChannel deleter sync.Once // Stats tracking e2eProcessingLatencyStream *quantile.Quantile
channel最重要的几个结构应该是跟延迟投递消息,以及可靠性保证的消息的相关数据结构。两者基本类似,都是用优先级队列实现的。nsq支持塞入队列的时候指定生效时间,支持超时到多久之后生效。这块实现后面具体介绍。字段含义看代码注释:
// TODO: these can be DRYd up //延迟投递消息,消息体会放入deferredPQ,并且由后台的queueScanLoop协程来扫描消息, //将过期的消息照常使用c.put(msg)发送出去 deferredMessages map[MessageID]*pqueue.Item deferredPQ pqueue.PriorityQueue //延迟投递消息的优先级队列 deferredMutex sync.Mutex //正在发送中的消息记录,直到收到客户端的FIN才会删除,否则timeout到来会重传消息的. //这里应用层有个坑,如果程序处理延迟了,那么可能重复投递,那怎么办, //应用层得注意这个,设置了timeout就得接受有重传的存在 inFlightMessages map[MessageID]*Message inFlightPQ inFlightPqueue inFlightMutex sync.Mutex }
一、创建channel
channel的创建有多个时机,消费者在订阅一个channel的时候,或者从lookupd上加载一个topic后,都会创建channel, 基本上都是在获取的时候自动创建,函数GetChannel()->getOrCreateChannel(), 如果是新创建了一个channel,那么会通知topic的messagePump 更新订阅发布事件,如果topic上面一个topic都没有的话,就不需要监听topic的消息了。
func (t *Topic) GetChannel(channelName string) *Channel { //获取topic的channel,如果之前没有是新建的,则通知channelUpdateChan 去刷新订阅状态 t.Lock() channel, isNew := t.getOrCreateChannel(channelName) t.Unlock() if isNew { // update messagePump state select { //通知去刷新订阅状态,如果没有channel了就不用发布了 case t.channelUpdateChan <- 1://另一端是(t *Topic) messagePump case <-t.exitChan: } } return channel }
后者getOrCreateChannel 代码如下,调用NewChannel创建一个channel,比较简单。
func (t *Topic) getOrCreateChannel(channelName string) (*Channel, bool) { //获取一个channel,如果没有就新建它 //调用方已经对topic加锁了t.Lock(), 所以不需要加锁 channel, ok := t.channelMap[channelName] if !ok { deleteCallback := func(c *Channel) { t.DeleteExistingChannel(c.name) } //不存在,初始化一个channel,设置持久化结构等 channel = NewChannel(t.name, channelName, t.ctx, deleteCallback) t.channelMap[channelName] = channel t.ctx.nsqd.logf(LOG_INFO, "TOPIC(%s): new channel(%s)", t.name, channel.name) return channel, true } return channel, false }
具体的NewChannel新建流程比较简单,也没有topic那种创建后端异步队列的流程。这点channel跟topic不一样,每个topic会有一个后台协程来负责真正消息的发送。而channel的消息发送其实是在topic协程里面就直接发送到channel.memoryMsgChan 或者 backend了,而后台协程是谁呢?其实是各个SUB订阅的消费者,他们会在另外一端监听channel的消息。
二、SUB订阅channel消息
SUB订阅消息仅限于TCP协议,客户端通过SUB命令订阅上来,最后调用cannnel.AddClient函数,函数很简单,就 在channel上记录了一下当前clientid,以备后面进行清理等使用。
// AddClient adds a client to the Channel's client list func (c *Channel) AddClient(clientID int64, client Consumer) { //sub命令触发到这里,在channel上面增加一个client c.Lock() defer c.Unlock() _, ok := c.clients[clientID] if ok { return } c.clients[clientID] = client }
另外clients还有一个作用,就是如果topic属于临时队列,不需要保存历史痕迹的,如果所有消费者都已经退出后,这会删除channel,进而如果所有channel都关闭了,就会删除上层的topic,具体如下:
func (c *Channel) RemoveClient(clientID int64) { c.Lock() defer c.Unlock() _, ok := c.clients[clientID] if !ok { return } delete(c.clients, clientID) if len(c.clients) == 0 && c.ephemeral == true { //所有client都退出了,如果是临时的ephemeral topic,就会删除这个channel, // 实际上就是DeleteExistingChannel go c.deleter.Do(func() { c.deleteCallback(c) }) } }
channel的delete函数进而会删除上层topic的结构,如果一个channel都没有了的话:
func (t *Topic) DeleteExistingChannel(channelName string) error { delete(t.channelMap, channelName) channel.Delete() if numChannels == 0 && t.ephemeral == true { go t.deleter.Do(func() { t.deleteCallback(t) }) } return nil }
三、PUB发送消息到channel - PutMessage
客户端PUB发送消息后,会经由topic.messagePump有消息时循环每一个channel,调用channel.PutMessage发送消息给后面的消费者,实际上调用的是channel.put函数。
channel的putmessage,还是老办法,将消息放入channel.memoryMsgChan里面,或者放到后台持久化里面,如果客户端来不及接受的话, 那就存入文件。
这里里客户端是如何接收到消息的呢?可以看SUB命令了. sub命令最后会调用到client.SubEventChan <- channel, 也就是说,会在这个客户端对应的消息循环里面记录这个channel.memoryMsgChan 并且监听他,任何客户端SUB到某个channel后,其消息循环便会订阅到对应这个channel的memoryMsgChan上面, 所以同一个channel,客户端随机有一个能收到消息. 这里我们知道,channel的消息发送也是通过管道,而管道的另一端,则是所有订阅到这上面的client的消息处理协程。
func (c *Channel) put(m *Message) error { select { case c.memoryMsgChan <- m: default: b := bufferPoolGet() err := writeMessageToBackend(b, m, c.backend) bufferPoolPut(b) c.ctx.nsqd.SetHealth(err) } return nil }
四、defer延迟投递消息
nsqd 也支持延迟投递消息,类似beanstalk的设置消息的超时时间,等到达时间后消息可以背消费者读取到。在实际应用中也会有不错的场景可以使用到;
defer消息的发送方法跟PUB就差一个超时时间了,大于零代表是defer消息。实现函数不是PutMessage,而是PutMessageDeferred ,后者调用StartDeferredTimeout来实现真正的逻辑:算绝对时间absTs, 然后新建一个pqueue.Item放到优先级队列里面,优先级就是absTs,按照时间顺序排列;
func (c *Channel) PutMessageDeferred(msg *Message, timeout time.Duration) { //延迟投递消息,也算一个messageCount。 参数timeout是延迟秒数 atomic.AddUint64(&c.messageCount, 1) c.StartDeferredTimeout(msg, timeout) } func (c *Channel) StartDeferredTimeout(msg *Message, timeout time.Duration) error { //延迟投递消息,计算绝对时间absTs, 然后新建一个pqueue.Item放到优先级队列里面,优先级就是absTs,按照时间顺序排列 absTs := time.Now().Add(timeout).UnixNano() item := &pqueue.Item{Value: msg, Priority: absTs} //下面就是记录一下deferredMessages[id] = item,房子重复延迟投递 err := c.pushDeferredMessage(item) c.addToDeferredPQ(item) return nil }
可以看到,对于延迟投递的消息最后就是用消息到达的时间戳来当优先级值,放入优先级队列。
放入到优先级队列里面. 那么这条消息怎么触发呢? 在main函数里面,会启动queueScanLoop协程,后者会定时启动扫描任务扫描所有channels, 然后去处理这个优先级队列,调用processDeferredQueue 函数,如果有到期的消息就触发他;
来看看processDeferredQueue 函数代码, 起主要逻辑就是从优先级队列里面循环读取已到期的消息,然后将其重新调用put函数发送出去,跟客户端PUB是一个效果:
func (c *Channel) processDeferredQueue(t int64) bool { //被queueScanLoop调用来扫描是否有到期的延迟投递消息 c.exitMutex.RLock() defer c.exitMutex.RUnlock() dirty := false for { //循环处理到期消息,这里如果有大量消息没有处理,就会死循环一直到处理完毕为止 //目测不是太好,如果业务设置某个时间到期的消息,基本上nsqd应该会卡的不行了 //会不断在这里处理这些消息,当然对于服务是还能响应的 c.deferredMutex.Lock() //如果有取一条的到期消息 item, _ := c.deferredPQ.PeekAndShift(t) c.deferredMutex.Unlock() dirty = true msg := item.Value.(*Message) //从c.deferredMessages 删除 _, err := c.popDeferredMessage(msg.ID) //调用常规发送消息的put函数去照常投递消息 c.put(msg) } exit: return dirty }
五、FIN 消息的确认到达
nsqd 对于消息的确认到达,是通过消费者发送FIN+msgid来实现的,可想而知,他需要一个队列记录:当前正在发送,待确认的消息列表,类似窗口协议,性能差点可能,这就是:InFlightQueue 。
次发送客户端消息的时候,会调用StartInFlightTimeout来记录当前的infight消息,以备进行重传。如果客户端收到消息,会发送FIN 命令来结束消息倒计时,不用重传了。简单看看StartInFlightTimeout 代码。
这个函数会在客户端的protocolV2) messagePump 循环体里面,当某个客户端连接得到内存或者磁盘消息后,在发送前会设置这个inflight队列,用来记录当前正在发送的消息,如果超时时间到来还没有确认收到,那么会有queueScanLoop循环去扫描,并且重发这条消息, 最后会调用到processInFlightQueue
func (c *Channel) StartInFlightTimeout(msg *Message, clientID int64, timeout time.Duration) error { //这个函数会在客户端的protocolV2) messagePump 循环体里面,当某个客户端连接得到内存或者磁盘消息后, //在发送前会设置这个inflight队列,用来记录当前正在发送的消息,如果超时时间到来还没有确认收到, //那么会有queueScanLoop循环去扫描,并且重发这条消息, 最后会调用到processInFlightQueue now := time.Now() //下面改clientID会不会有问题?不会,因为一个消息只可能给一个客户端,topic在发送消息到channel的时候 //第0个channel会复用当初的msg结构,之后的会创建一个新的 msg.clientID = clientID msg.deliveryTS = now msg.pri = now.Add(timeout).UnixNano() err := c.pushInFlightMessage(msg) c.addToInFlightPQ(msg) return nil }
到这里channel的主要功能差不多了,想对实现比较简单,主要就是实现了channel的延迟投递功能,以及消息的确认到达功能。后面在记录一下这整个流程是怎么串起来的,nsqd的消息发送流程,从生产者PUB开始,到消费者SUB收到消息后FIN结束。
近期评论