首页 > GO, nsqd, nsqd > nsqd 源码分析(3)- Channel实现原理

nsqd 源码分析(3)- Channel实现原理

2018年5月27日 发表评论 阅读评论 30945次阅读    

写一下Channel的实现,比topic相对简单一些,但channel是最接近消费者端的,有他特有的东西,包括投递,确认等;
nsqd的channel的作用在于能对指定的队列topic,进行多份投递,或者说消费,一份队列可以给多个消费者重复消费,有点类似于kafka的consumer_group,每个consumer_group拥有独立的position, 不同group之间可以消费同一份内容的多个副本。
这里稍微做个对比,kafka的已经消费国的消息是可以继续保留的,而nsq则如果消费完了,就会删除掉,也就是没有position可以供回溯,或者重复消费,只要客户端发发送FIN后,消息就会销毁,也不会保留在磁盘上面,这点比kafka简单多了,当然,轻量级消息队列也不太需要这么复杂的功能。

下面分几个方面讲一下channel的实现,代码相对简单,可能复杂的地方在于怎么跟topic以及client协同了。


零、channel结构体

先来看第一部分,基本的数据结构,channel结构保存了接收到的消息数,超时数等一些统计数据,用处不是太大。其次保存了所属的topicname,channel name,以及顶层的context,其实就是NSQD结构。

type Channel struct {
    // 64bit atomic vars need to be first for proper alignment on 32bit platforms
    requeueCount uint64 
    messageCount uint64
    timeoutCount uint64

    topicName string
    name      string
    ctx       *context
    backend BackendQueue //磁盘持久化存储
    memoryMsgChan chan *Message //channel的消息管道,topic发送消息会放入这里面,所有SUB的客户端会用后台协程订阅到这个管道后面, 1:
n
    exitFlag      int32
    exitMutex     sync.RWMutex
    clients        map[int64]Consumer  //所有订阅的topic都会记录到这里, 用来在关闭的时候清理client,以及根据clientid找client 
    paused         int32
    ephemeral      bool
    deleteCallback func(*Channel) //实际上就是DeleteExistingChannel 
    deleter        sync.Once

    // Stats tracking
    e2eProcessingLatencyStream *quantile.Quantile

channel最重要的几个结构应该是跟延迟投递消息,以及可靠性保证的消息的相关数据结构。两者基本类似,都是用优先级队列实现的。nsq支持塞入队列的时候指定生效时间,支持超时到多久之后生效。这块实现后面具体介绍。字段含义看代码注释:

    // TODO: these can be DRYd up
    //延迟投递消息,消息体会放入deferredPQ,并且由后台的queueScanLoop协程来扫描消息,
    //将过期的消息照常使用c.put(msg)发送出去
    deferredMessages map[MessageID]*pqueue.Item
    deferredPQ       pqueue.PriorityQueue //延迟投递消息的优先级队列
    deferredMutex    sync.Mutex

    //正在发送中的消息记录,直到收到客户端的FIN才会删除,否则timeout到来会重传消息的.
    //这里应用层有个坑,如果程序处理延迟了,那么可能重复投递,那怎么办, 
    //应用层得注意这个,设置了timeout就得接受有重传的存在
    inFlightMessages map[MessageID]*Message
    inFlightPQ       inFlightPqueue
    inFlightMutex    sync.Mutex
}

一、创建channel

channel的创建有多个时机,消费者在订阅一个channel的时候,或者从lookupd上加载一个topic后,都会创建channel, 基本上都是在获取的时候自动创建,函数GetChannel()->getOrCreateChannel(), 如果是新创建了一个channel,那么会通知topic的messagePump 更新订阅发布事件,如果topic上面一个topic都没有的话,就不需要监听topic的消息了。

func (t *Topic) GetChannel(channelName string) *Channel {
    //获取topic的channel,如果之前没有是新建的,则通知channelUpdateChan 去刷新订阅状态
    t.Lock()
    channel, isNew := t.getOrCreateChannel(channelName)
    t.Unlock()
    if isNew {
        // update messagePump state
        select {
            //通知去刷新订阅状态,如果没有channel了就不用发布了
        case t.channelUpdateChan <- 1://另一端是(t *Topic) messagePump
        case <-t.exitChan:
        }
    }
    return channel
}

后者getOrCreateChannel 代码如下,调用NewChannel创建一个channel,比较简单。

func (t *Topic) getOrCreateChannel(channelName string) (*Channel, bool) {
    //获取一个channel,如果没有就新建它
    //调用方已经对topic加锁了t.Lock(), 所以不需要加锁
    channel, ok := t.channelMap[channelName]
    if !ok {
        deleteCallback := func(c *Channel) {
            t.DeleteExistingChannel(c.name)
        }
        //不存在,初始化一个channel,设置持久化结构等
        channel = NewChannel(t.name, channelName, t.ctx, deleteCallback)
        t.channelMap[channelName] = channel
        t.ctx.nsqd.logf(LOG_INFO, "TOPIC(%s): new channel(%s)", t.name, channel.name)
        return channel, true
    }
    return channel, false
}

具体的NewChannel新建流程比较简单,也没有topic那种创建后端异步队列的流程。这点channel跟topic不一样,每个topic会有一个后台协程来负责真正消息的发送。而channel的消息发送其实是在topic协程里面就直接发送到channel.memoryMsgChan 或者 backend了,而后台协程是谁呢?其实是各个SUB订阅的消费者,他们会在另外一端监听channel的消息。


二、SUB订阅channel消息

SUB订阅消息仅限于TCP协议,客户端通过SUB命令订阅上来,最后调用cannnel.AddClient函数,函数很简单,就 在channel上记录了一下当前clientid,以备后面进行清理等使用。

// AddClient adds a client to the Channel's client list
func (c *Channel) AddClient(clientID int64, client Consumer) {
    //sub命令触发到这里,在channel上面增加一个client
    c.Lock()
    defer c.Unlock()

    _, ok := c.clients[clientID]
    if ok {
        return
    }   
    c.clients[clientID] = client
}

另外clients还有一个作用,就是如果topic属于临时队列,不需要保存历史痕迹的,如果所有消费者都已经退出后,这会删除channel,进而如果所有channel都关闭了,就会删除上层的topic,具体如下:

func (c *Channel) RemoveClient(clientID int64) {
    c.Lock()
    defer c.Unlock()

    _, ok := c.clients[clientID]
    if !ok {
        return
    }   
    delete(c.clients, clientID)
    if len(c.clients) == 0 && c.ephemeral == true {
    //所有client都退出了,如果是临时的ephemeral topic,就会删除这个channel,
    // 实际上就是DeleteExistingChannel 
        go c.deleter.Do(func() { c.deleteCallback(c) })
    }
}

channel的delete函数进而会删除上层topic的结构,如果一个channel都没有了的话:

func (t *Topic) DeleteExistingChannel(channelName string) error {
    delete(t.channelMap, channelName)
    channel.Delete()
    if numChannels == 0 && t.ephemeral == true {
        go t.deleter.Do(func() { t.deleteCallback(t) })
    }
    return nil
}

三、PUB发送消息到channel - PutMessage

客户端PUB发送消息后,会经由topic.messagePump有消息时循环每一个channel,调用channel.PutMessage发送消息给后面的消费者,实际上调用的是channel.put函数。
channel的putmessage,还是老办法,将消息放入channel.memoryMsgChan里面,或者放到后台持久化里面,如果客户端来不及接受的话, 那就存入文件。

这里里客户端是如何接收到消息的呢?可以看SUB命令了. sub命令最后会调用到client.SubEventChan <- channel, 也就是说,会在这个客户端对应的消息循环里面记录这个channel.memoryMsgChan 并且监听他,任何客户端SUB到某个channel后,其消息循环便会订阅到对应这个channel的memoryMsgChan上面, 所以同一个channel,客户端随机有一个能收到消息. 这里我们知道,channel的消息发送也是通过管道,而管道的另一端,则是所有订阅到这上面的client的消息处理协程。

func (c *Channel) put(m *Message) error {
    select {
    case c.memoryMsgChan &lt;- m:
    default:
        b := bufferPoolGet()
        err := writeMessageToBackend(b, m, c.backend)
        bufferPoolPut(b)
        c.ctx.nsqd.SetHealth(err)
    }
    return nil
}

四、defer延迟投递消息

nsqd 也支持延迟投递消息,类似beanstalk的设置消息的超时时间,等到达时间后消息可以背消费者读取到。在实际应用中也会有不错的场景可以使用到;
defer消息的发送方法跟PUB就差一个超时时间了,大于零代表是defer消息。实现函数不是PutMessage,而是PutMessageDeferred ,后者调用StartDeferredTimeout来实现真正的逻辑:算绝对时间absTs, 然后新建一个pqueue.Item放到优先级队列里面,优先级就是absTs,按照时间顺序排列;

func (c *Channel) PutMessageDeferred(msg *Message, timeout time.Duration) {
    //延迟投递消息,也算一个messageCount。 参数timeout是延迟秒数
    atomic.AddUint64(&amp;c.messageCount, 1)
    c.StartDeferredTimeout(msg, timeout)
}

func (c *Channel) StartDeferredTimeout(msg *Message, timeout time.Duration) error {
    //延迟投递消息,计算绝对时间absTs, 然后新建一个pqueue.Item放到优先级队列里面,优先级就是absTs,按照时间顺序排列
    absTs := time.Now().Add(timeout).UnixNano()
    item := &amp;pqueue.Item{Value: msg, Priority: absTs}
    //下面就是记录一下deferredMessages[id] = item,房子重复延迟投递
    err := c.pushDeferredMessage(item)
    c.addToDeferredPQ(item)
    return nil
}

可以看到,对于延迟投递的消息最后就是用消息到达的时间戳来当优先级值,放入优先级队列。
放入到优先级队列里面. 那么这条消息怎么触发呢? 在main函数里面,会启动queueScanLoop协程,后者会定时启动扫描任务扫描所有channels, 然后去处理这个优先级队列,调用processDeferredQueue 函数,如果有到期的消息就触发他;
来看看processDeferredQueue 函数代码, 起主要逻辑就是从优先级队列里面循环读取已到期的消息,然后将其重新调用put函数发送出去,跟客户端PUB是一个效果:

func (c *Channel) processDeferredQueue(t int64) bool {
    //被queueScanLoop调用来扫描是否有到期的延迟投递消息 
    c.exitMutex.RLock()
    defer c.exitMutex.RUnlock() 
    dirty := false
    for {
        //循环处理到期消息,这里如果有大量消息没有处理,就会死循环一直到处理完毕为止
        //目测不是太好,如果业务设置某个时间到期的消息,基本上nsqd应该会卡的不行了
        //会不断在这里处理这些消息,当然对于服务是还能响应的
        c.deferredMutex.Lock()
        //如果有取一条的到期消息
        item, _ := c.deferredPQ.PeekAndShift(t)
        c.deferredMutex.Unlock()
        dirty = true

        msg := item.Value.(*Message)
        //从c.deferredMessages 删除
        _, err := c.popDeferredMessage(msg.ID)
        //调用常规发送消息的put函数去照常投递消息
        c.put(msg)
    }
exit:
    return dirty
}

五、FIN 消息的确认到达

nsqd 对于消息的确认到达,是通过消费者发送FIN+msgid来实现的,可想而知,他需要一个队列记录:当前正在发送,待确认的消息列表,类似窗口协议,性能差点可能,这就是:InFlightQueue 。
次发送客户端消息的时候,会调用StartInFlightTimeout来记录当前的infight消息,以备进行重传。如果客户端收到消息,会发送FIN 命令来结束消息倒计时,不用重传了。简单看看StartInFlightTimeout 代码。
这个函数会在客户端的protocolV2) messagePump 循环体里面,当某个客户端连接得到内存或者磁盘消息后,在发送前会设置这个inflight队列,用来记录当前正在发送的消息,如果超时时间到来还没有确认收到,那么会有queueScanLoop循环去扫描,并且重发这条消息, 最后会调用到processInFlightQueue

func (c *Channel) StartInFlightTimeout(msg *Message, clientID int64, timeout time.Duration) error {
    //这个函数会在客户端的protocolV2) messagePump 循环体里面,当某个客户端连接得到内存或者磁盘消息后,                           
    //在发送前会设置这个inflight队列,用来记录当前正在发送的消息,如果超时时间到来还没有确认收到,
    //那么会有queueScanLoop循环去扫描,并且重发这条消息, 最后会调用到processInFlightQueue
    now := time.Now()
    //下面改clientID会不会有问题?不会,因为一个消息只可能给一个客户端,topic在发送消息到channel的时候
    //第0个channel会复用当初的msg结构,之后的会创建一个新的
    msg.clientID = clientID
    msg.deliveryTS = now
    msg.pri = now.Add(timeout).UnixNano()
    err := c.pushInFlightMessage(msg)
    c.addToInFlightPQ(msg)
    return nil
}

到这里channel的主要功能差不多了,想对实现比较简单,主要就是实现了channel的延迟投递功能,以及消息的确认到达功能。后面在记录一下这整个流程是怎么串起来的,nsqd的消息发送流程,从生产者PUB开始,到消费者SUB收到消息后FIN结束。

Share
分类: GO, nsqd, nsqd 标签: , ,
  1. 本文目前尚无任何评论.
  1. 本文目前尚无任何 trackbacks 和 pingbacks.

注意: 评论者允许使用'@user空格'的方式将自己的评论通知另外评论者。例如, ABC是本文的评论者之一,则使用'@ABC '(不包括单引号)将会自动将您的评论发送给ABC。使用'@all ',将会将评论发送给之前所有其它评论者。请务必注意user必须和评论者名相匹配(大小写一致)。