首页 > GO, nsqd, nsqd > nsqd 源码分析(2)- Topic实现原理

nsqd 源码分析(2)- Topic实现原理

2018年5月22日 发表评论 阅读评论 30113次阅读    

这回记录一下nsqd的topic是怎么实现的。先以官网一张著名的动图开始, 下面这图介绍了发送一条消息到一个topic上的流程:

从上图可以看出,nsq的一个topic对应这多个自定义命名的channel, channel后面可以订阅很多个消费者,也就是我们的消费程序了;这中间有个地方要注意,同一个channel后面的消费者,一条消息之后到一个消费者上,但是不同channel的消费者都可以收到,也就是类似于kafka的group一个概念。
具体的不多介绍了,这里只讲实现原理。


一、Topic数据结构

下面是Topic的数据结构(去掉了不重要的变量),所有topic列表会存储在NSQD.topicMap[]里面。
可以看出,最主要的变量在于channelMap,这是这个topic的所有channel结构。memoryMsgChan 是这个topic对应的内存管道, backend是对应的持久化磁盘存储的管道。

type Topic struct {
    name              string
    channelMap        map[string]*Channel
    backend           BackendQueue
    memoryMsgChan     chan *Message
    channelUpdateChan chan int 
    exitFlag          int32 //这个标志是在删除一个topic时,先设置这个,然后在进行真实删除的。PutMessage的时候也会检查这状态,如果正在进行topic删除,那直接返回不入队
    ephemeral      bool //临时topic以#开头,这样的topic不会写到磁盘上,并且放入内存队列后会丢掉,最后一个channel删除后也会删除这>
个topic,没有持久化
    deleteCallback func(*Topic) //topic删除函数,其实是DeleteExistingTopic, 只有在DeleteExistingChannel里面,topic是ephemeral且最后一个channel也删除后会调用
    ctx *context
}

上面的ephemeral变量特殊说明一下, 临时topic以#ephemeral开头,这样的topic不会写到磁盘上,并且放入内存队列后会丢掉,最后一个channel删除后也会删除这个topic,没有持久化功能。
下面简单画了一下topic的数据结构流向:


二、如何新建一个topic

新建topic的函数为NewTopic, 新建topic发生的时机在哪些地方呢?
1. loadMetaData加载磁盘文件nsqd.data时,会先创建所有之前的topic;
2. GetTopic 时如果topic不存在,会自动创建一个;
新建topic的步骤为,先new 一个Topic 结构,初始化各项变量;这里如果是临时topic以#ephemeral开头,没有持久化机制,只放入内存中,所以其backend其实是个黑洞,直接丢掉。
如果是正常topic,需要设置其log函数,以及最重要的,backend持久化机制。具体逻辑后面的文章在细化。
初始化变量之后就是最重要的事情了:启动消息协程。最后通知lookupd有新的topic产生。具体如下,可以对应看注释:

func NewTopic(topicName string, ctx *context, deleteCallback func(*Topic)) *Topic {
    //初始化一个topic结构,并且设置其backend持久化结构,然后开启消息监听协程messagePump, 通知lookupd
    t := &Topic{
        name:              topicName,
        channelMap:        make(map[string]*Channel),                                                                            
        memoryMsgChan:     make(chan *Message, ctx.nsqd.getOpts().MemQueueSize),
        exitChan:          make(chan int),
        channelUpdateChan: make(chan int),
        ctx:               ctx,
        pauseChan:         make(chan bool),
        deleteCallback:    deleteCallback, //topic删除函数,其实是DeleteExistingTopic
        idFactory:         NewGUIDFactory(ctx.nsqd.getOpts().ID),
    }

    if strings.HasSuffix(topicName, "#ephemeral") {
        //临时topic以#ephemeral开头,没有持久化机制,只放入内存中,所以其backend其实是个黑洞,直接丢掉
        t.ephemeral = true
        t.backend = newDummyBackendQueue()
    } else {
        //正常的topic,需要设置其log函数,以及最重要的,backend持久化机制
        dqLogf := func(level diskqueue.LogLevel, f string, args ...interface{}) {
            opts := ctx.nsqd.getOpts()
            lg.Logf(opts.Logger, opts.logLevel, lg.LogLevel(level), f, args...)
        }
        //下面初始化一下持久化的diskqueue数据结构, 传入路径和文件大小相关的参数,以及sync刷磁盘的配置
        t.backend = diskqueue.New(
            topicName,
            ctx.nsqd.getOpts().DataPath,
            ctx.nsqd.getOpts().MaxBytesPerFile,
            int32(minValidMsgLength),
            int32(ctx.nsqd.getOpts().MaxMsgSize)+minValidMsgLength,
            ctx.nsqd.getOpts().SyncEvery,
            ctx.nsqd.getOpts().SyncTimeout,
            dqLogf,
        )
    }      
    //异步开启消息循环,这是最重要的异步,开启了一个新的携程处理
    t.waitGroup.Wrap(func() { t.messagePump() })

    //通知lookupd有新的topic产生了
    t.ctx.nsqd.Notify(t)

    return t
}

这里关于启动消息协程后面在细化,先讲一下删除topic的流程。


三、删除topic

topic的删除主要是调用Delete或者Close 函数,前者会通知lookupd删除所有的topic信息,后者主要用在关闭nsqd服务的时候,关闭本程序的topic。
消息的删除函数,大概做的事情为:

  1. 通知lookupd删除对应的数据;
  2. 关闭topic.exitChan管道让topic.messagePump退出;
  3. 循环删除其channelMap列表;
  4. 将内存未消费的消息持久化;

Delete实际上调研的是exit()函数,看看代码前半部分实现的功能:

func (t *Topic) exit(deleted bool) error {
    //消息的删除函数,大概做的事情为:1. 通知lookupd; 2. 关闭topic.exitChan管道让topic.messagePump退出; 3. 循环删除其channelMap
列表; 4. 将内存未消费的消息持久化;
    //先判断一下状态看能否继续删除
    if !atomic.CompareAndSwapInt32(&t.exitFlag, 0, 1) {
        return errors.New("exiting")
    }
    if deleted {
        t.ctx.nsqd.logf(LOG_INFO, "TOPIC(%s): deleting", t.name)

        // since we are explicitly deleting a topic (not just at system exit time)
        // de-register this from the lookupd
        //通知lookupLoop协程进行处理,有增删topic了,需要进行UnRegister 后者 Register topic了
        t.ctx.nsqd.Notify(t)
    } else {
        t.ctx.nsqd.logf(LOG_INFO, "TOPIC(%s): closing", t.name)
    }
    //关闭管道,这样其他在这个topic上的协程就会退出, 比如topic.messagePump 就会退出topic消息循环
    close(t.exitChan)
    //下面其实是在等待启动消息循环的代码退出: t.waitGroup.Wrap(func() { t.messagePump() }), 这样不会再有在这个topic上的操作了
    // synchronize the close of messagePump()
    t.waitGroup.Wait()

最后一步t.waitGroup.Wait() 其实是在等待topic的消息协程退出,因为上面调用了close(t.exitChan), 说消息协程会收到这个消息并且退出协程。因此这里Wait等待协程退出后,就开始清理channel等信息了。接下来准备清空对应channel的数据 , 看是否是要删除topic来决定调用delete还是close, 这个处理类似topic的处理:

    //接下来准备清空对应channel的数据 , 看是否是要删除topic来决定调用delete还是close, 这个处理类似topic的处理
    if deleted {
        t.Lock()
        for _, channel := range t.channelMap {
            delete(t.channelMap, channel.name)
            channel.Delete()
        }
        t.Unlock()
        // empty the queue (deletes the backend files, too)
        t.Empty()
        //然后在通知后面的disqqueue进行清理删除
        return t.backend.Delete()
    }

    // close all the channels
    for _, channel := range t.channelMap {
        err := channel.Close()
    }
    // write anything leftover to disk
    //如果还有内存消息没处理完需要写入后端的持久化设备
    t.flush()
    return t.backend.Close()
} 

四、如何获取Topic

消息获取函数其实是在nsqd.go里面实现的GetTopic , 则函数会先简单获取一把读锁看topic是否已经存在,如果已经存在直接返回,如果不存在就到后面的创建,初始化流程。

先来看一下如果消息存在的情况,nsqd为了降低锁禁止,先获取读锁看一下,然后如果不信,在获取写锁去尝试newtopic。

func (n *NSQD) GetTopic(topicName string) *Topic {
    // most likely, we already have this topic, so try read lock first.
    //如上面所说,大部分情况都是存在topic的,这优化值得
    n.RLock()
    t, ok := n.topicMap[topicName]
    n.RUnlock()
    if ok {
        return t
    }   
    //不存在这topc,得new一个了,  所以直接加锁了整个nsqd结构
    n.Lock()
    t, ok = n.topicMap[topicName]
    if ok { //还有种情况,就在刚才那一瞬间,有其他协程进来了,他new了一个,所以获取锁后还得判断一下是否存在
        n.Unlock()
        return t
    } 

如果topic不存在,需要新建一个。主要工作由NewTopic来完成,上面讲过,创建一个topic结构,并且里面初始化好diskqueue, 加入到NSQD的topicmap里面。创建topic的时候,会开启消息协程。

另外这里值得提的一电视,函数里面的锁替换,刚开始因为要修改NSQD的topicMap结构,所以必须对nsqd加锁,创建完n.topicMap[topicName] = t 赋值后,就不需要加大锁了,所以先对刚刚创建的topic加一把写锁在释放NSQD的读锁。
这里会不会产生死锁呢?答案是不会,具体看下面的代码注释。

    deleteCallback := func(t *Topic) {
        //topic的删除函数
        n.DeleteExistingTopic(t.name)
    }   
    //创建一个topic结构,并且里面初始化好diskqueue, 加入到NSQD的topicmap里面
    //创建topic的时候,会开启消息协程
    t = NewTopic(topicName, &context{n}, deleteCallback)
    n.topicMap[topicName] = t

    // release our global nsqd lock, and switch to a more granular topic lock while we init our
    // channels from lookupd. This blocks concurrent PutMessages to this topic.
    //可以理解为先加锁topic,这个时候会加不上吗?
    //不会,因为我们在获取topic的时候,会先加nsqd的读锁获取topic,然后再第二步进行PutMessage或者DeleteExistingChannel的时候,会加
RLock或者Lock;
    //因此,由于本函数开头已经先加了n.Lock()大锁,所以上面第二步不可能进入,因此下面可以直接加t.Lock而不用担心死锁
    //这里相当于已经创建了topic到nsqd的topicMap,接下来的事情不涉及到nsqd,而只是topic内部的事情了,所以换一把小一点的锁
    t.Lock()
    n.Unlock()

后面的步骤就是加载lookupd里面的channel信息,进行初始化数据结构了。最后释放topic锁, 然后往管道channelUpdateChan里面塞入一个事件,通知topic的后台消息协程去处理channel的变动事件。

    //lookupd里面存储所有之前的channel信息,所以这里加载一下,这样消息能不丢
    // if using lookupd, make a blocking call to get the topics, and immediately create them.
    // this makes sure that any message received is buffered to the right channels
    lookupdHTTPAddrs := n.lookupdHTTPAddrs()
    if len(lookupdHTTPAddrs) > 0 {
        channelNames, err := n.ci.GetLookupdTopicChannels(t.name, lookupdHTTPAddrs)
        for _, channelName := range channelNames {
            //临时topic不需要预先创建,用到的时候再创建就行
            if strings.HasSuffix(channelName, "#ephemeral") {
                continue
            }
            //预先创建一个channel,原因呢?为了让消息能够及时的入队.
            //比如,我这个nsq重启了,那么重启的这时刻,需要加载曾经的所有channel,以备每一个channel的消息不丢。不然只能等着对方cr
eate了,不方便
            t.getOrCreateChannel(channelName)
        }
    }
    t.Unlock()
    select {
    case t.channelUpdateChan <- 1:
    case <-t.exitChan:
    }
    return t
}

五、如何往Topic发送消息

消息的发送操作是二进制的PUB或者“/pub?topic=testtopic” 接口,后面其实都是调用的(t *Topic) PutMessage函数去真正发送一条消息到一个topic。

// PutMessage writes a Message to the queue
func (t *Topic) PutMessage(m *Message) error {
    t.RLock()
    defer t.RUnlock()
    //简单看一下是不是我们正在退出状态,如果是就直接返回
    if atomic.LoadInt32(&t.exitFlag) == 1 {
        return errors.New("exiting")
    }
    err := t.put(m)
    if err != nil {
        return err
    }
    atomic.AddUint64(&t.messageCount, 1)
    return nil
}

可以看到真正的发送消息函数是put, 我们知道topic存储目标有2个,一个原生内存管道memoryMsgChan,另外一个是持久化存储backend。怎么判别呢?答案就是先看memoryMsgChan是否已经满了,如果满了就不能继续塞了,那就存到后端持久化存储里面去。
memoryMsgChan的容量由 getOpts().MemQueueSize设置,在上面的 NewTopic 函数里面进行初始化,之后不能修改了。

func (t *Topic) put(m *Message) error {
    select {
        //将这条消息直接塞入内存管道
    case t.memoryMsgChan <- m:
    default://如果内存消息管道满了(memoryMsgChan的容量由 getOpts().MemQueueSize设置),那么就放入到后面的持久化存储里面           
        b := bufferPoolGet()                                                  
        err := writeMessageToBackend(b, m, t.backend)
        bufferPoolPut(b)
        t.ctx.nsqd.SetHealth(err)
    }
    return nil
}

topic数据结构主要的差不多了,实现文件在nsqd/topic.go,很简练,总结一下就是nsq的topic主要记录有哪些channel,以及内存管道memoryMsgChan和持久化存储BackendQueue,每一个topic后面会有一个消息协程负责处理这个topic的事务。
用原生内存channel管道当消息队列,也是挺有创意的,不过仔细想想,也有些他的优缺点,比如这channel里的消息,基本上就没有保障了,如果挂了什么的。当然如果为了性能,可以设置容量少一些,但是还是有弊端的。

关于具体后面topic的消息到memoryMsgChan,以及再到channel最后到client的流程,篇幅有点长,改天再写。

Share
分类: GO, nsqd, nsqd 标签: , ,
  1. 本文目前尚无任何评论.
  1. 本文目前尚无任何 trackbacks 和 pingbacks.

注意: 评论者允许使用'@user空格'的方式将自己的评论通知另外评论者。例如, ABC是本文的评论者之一,则使用'@ABC '(不包括单引号)将会自动将您的评论发送给ABC。使用'@all ',将会将评论发送给之前所有其它评论者。请务必注意user必须和评论者名相匹配(大小写一致)。