首页 > GO, nsqd, nsqd > nsqlookupd 源码分析(2)- topic通道注册,查询代码分析

nsqlookupd 源码分析(2)- topic通道注册,查询代码分析

2018年6月5日 发表评论 阅读评论 29733次阅读    

上篇文章介绍了一下nsqlookupd 启动服务的简单流程,限于篇幅到这里来写一下nsqlookupd主要的工作:通道注册登记,以及查询服务。
nsqlookupd 有两个主要的功能:

  1. nsqd来注册所有topic、channel的分布信息,nsqlookupd记录哪些topic在哪些机器上;
  2. 给消费端来查询对应的topic所在的机器列表,以供订阅消费;

下面先来讲一下TCP协议端,nsqd是怎么告诉nsqlookupd,他有个新的topic创建的呢? 这就得看 REGISTER操作了。

一、REGISTER 注册topic

1.nsqd端发送 REGISTER 命令 等级topic、channel

之前我们了解到,当nsqd创建一个topic的时候,NewTopic函数会调用nsqd.Notify(t) 来通知lookupd有新的topic产生了。 后者实际上通过channel通知lookupLoop协程进行处理的,处理代码:

        case val := <-n.notifyChan: //通知chanel, 一般有topic,channel上的增删的时候会写入管道, 进而通知其他lookup去处理
            var cmd *nsq.Command
            var branch string

            switch val.(type) {
            case *Channel:
                // notify all nsqlookupds that a new channel exists, or that it's removed
                branch = "channel"
                channel := val.(*Channel)
                if channel.Exiting() == true {
                    cmd = nsq.UnRegister(channel.topicName, channel.name)
                } else {//生成对应的指令
                    cmd = nsq.Register(channel.topicName, channel.name)
                }    
            }

所以如果nsqd上新增了一个topic, 那么就会调用nsq.Register() 生成一条REGISTER 命令: &Command{[]byte("REGISTER"), params, nil},之后遍历所有的lookupPeers,给他们发送注册指令:

            //循环在所有的lookup上执行这条指令
            for _, lookupPeer := range lookupPeers {
                n.logf(LOG_INFO, "LOOKUPD(%s): %s %s", lookupPeer, branch, cmd)
                _, err := lookupPeer.Command(cmd)
                if err != nil {
                    n.logf(LOG_ERROR, "LOOKUPD(%s): %s - %s", lookupPeer, cmd, err)
                }
            } 

所以这里注册指令的来源比较清楚了。下面开始了解一下nsqlookupd是怎么处理的。

nsqlookupd 登记topic、channel对应的nsqd列表

nsqlookupd的TCP协议处理函数实际是LookupProtocolV1::IOLoop, 这函数比较简单,主要就是调用了Exec函数,后者很简单,一共4个命令,我们只关注 REGISTER, 其他都比较简单或者类似:

func (p *LookupProtocolV1) Exec(client *ClientV1, reader *bufio.Reader, params []string) ([]byte, error) {
    switch params[0] {
    case "PING":
        return p.PING(client, params)
    case "IDENTIFY":
        return p.IDENTIFY(client, reader, params[1:])
    case "REGISTER":
        return p.REGISTER(client, reader, params[1:])
    case "UNREGISTER":
        return p.UNREGISTER(client, reader, params[1:])
    }   
    return nil, protocol.NewFatalClientErr(nil, "E_INVALID", fmt.Sprintf("invalid command %s", params[0]))
}

下面直接看REGISTER 函数, 函数首先用getTopicChan 读取nsqd发送过来的topic和channel, 然后根据是否有channel来进行对应的处理:如果有channel,就需要记录(topic,channel)对,否则只需要记录topic就行了,这里数据结构是共用的.
对于topic,channel 到 nsqd的映射,之前的文章“nsqlookupd 源码分析(1)- 启动服务” 里面讲过 RegistrationDB的结构,这里参考来看会比较直接。

func (p *LookupProtocolV1) REGISTER(client *ClientV1, reader *bufio.Reader, params []string) ([]byte, error) {
    //nsqd发送注册登记指令REGISTER, 来通知lookup他新建了一个topic和channel
    //nsqlookupd需要记录topic对应的nsqd列表,以备用来查询等
    topic, channel, err := getTopicChan("REGISTER", params)

    if channel != "" {//如果有channel, 需要单独记录一下channel,因为topic和channel可以1:n的
        key := Registration{"channel", topic, channel}
        //调用AddProducer 将映射关系放入map里面 
        p.ctx.nsqlookupd.DB.AddProducer(key, &Producer{peerInfo: client.peerInfo}) ;
    }
    key := Registration{"topic", topic, ""}
     p.ctx.nsqlookupd.DB.AddProducer(key, &Producer{peerInfo: client.peerInfo}) ;

    return []byte("OK"), nil 
}

AddProducer 比较简单,就是对registrationMap 的查询,如果当前这个客户端以及在映射表里面就不需要处理,否则就加进去。

func (r *RegistrationDB) AddProducer(k Registration, p *Producer) bool {
    //将一个nsqd加入到k参数指定的topic、channel映射表里面,先查是否已经在里面了,没有就加入
    r.Lock()                           
    defer r.Unlock()                   
    producers := r.registrationMap[k]  
    found := false                     
    for _, producer := range producers {
        if producer.peerInfo.id == p.peerInfo.id {
            found = true
            break
        }   
    }
    if found == false {
        r.registrationMap[k] = append(producers, p)
    }   
    return !found
}

UNREGISTER 注销也是类似的,当nsqd退出,或者topic删除了的话,会调用UNREGISTER 命令,类似REGISTER处理就行了。

二、“/lookup” 查询topic所在机器列表以及channel列表

查询方法

当消费者需要查询某个topic在哪些nsqd上时,可以发送/lookup 的http命令查询,nsqlookupd的查询topic信息的接口是这样的:

curl 'http://127.0.0.1:4161/lookup?topic=testtopic'

返回如下:

可以看到,/lookup命令会返回该topic对应的channel列表,以及都有哪些nsqd上面有这个topic。下面来看看代码:

/lookup 方法原理

/lookup指令的处理函数是 doLookup, 这是在Main的 newHTTPServer调用上设置的:

func newHTTPServer(ctx *Context) *httpServer {
    router.Handle("GET", "/lookup", http_api.Decorate(s.doLookup, log, http_api.V1))
    router.Handle("GET", "/topics", http_api.Decorate(s.doTopics, log, http_api.V1))
    router.Handle("GET", "/channels", http_api.Decorate(s.doChannels, log, http_api.V1))
    router.Handle("GET", "/nodes", http_api.Decorate(s.doNodes, log, http_api.V1))

    router.Handle("POST", "/topic/create", http_api.Decorate(s.doCreateTopic, log, http_api.V1))
    router.Handle("POST", "/topic/delete", http_api.Decorate(s.doDeleteTopic, log, http_api.V1))
}

doLookup函数先获取topic参数,然后查找nsqlookupd.DB 对应的topic的信息,如果没有查到就返回404,否则返回对应的json信息:

func (s *httpServer) doLookup(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) {
    //lookup命令会返回该topic对应的channel列表,以及都有哪些nsqd上面有这个topic 
    reqParams, err := http_api.NewReqParams(req)
    topicName, err := reqParams.Get("topic")

    //先查询map里面是否有对应topic的记录
    registration := s.ctx.nsqlookupd.DB.FindRegistrations("topic", topicName, "")
    //得到所有的channel列表
    channels := s.ctx.nsqlookupd.DB.FindRegistrations("channel", topicName, "*").SubKeys()
    //得到 有这个topic的nsqd列表,后面需要去掉不活跃的机器
    producers := s.ctx.nsqlookupd.DB.FindProducers("topic", topicName, "")
    producers = producers.FilterByActive(s.ctx.nsqlookupd.opts.InactiveProducerTimeout,
        s.ctx.nsqlookupd.opts.TombstoneLifetime)

    //返回json字符串
    return map[string]interface{}{
        "channels":  channels,
        "producers": producers.PeerInfo(),
    }, nil
}

调用的FindRegistrations 和 FindProducers 函数实现都在/nsqlookupd/registration_db.go 里面,代码比较简单,基本上就是简单匹配,以及map操作,这里不多说了。

三、 创建一个topic

顺便讲一下nsqlookupd上的创建一个topic是怎么操作的,会发生什么。 同样是HTTP接口,调用方法为“/topic/create”, 实现函数是doCreateTopic , 比较简单,其实就是在lookupd上增加了一条topic的记录,并且他的producers为空:

func (s *httpServer) doCreateTopic(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) {
    //创建一个topic, 只是简单的记录了一下topic信息,并且加到DB里面,对应的nsqd数组为空
    reqParams, err := http_api.NewReqParams(req) 
    topicName, err := reqParams.Get("topic")
    key := Registration{"topic", topicName, ""}
    s.ctx.nsqlookupd.DB.AddRegistration(key)
    return nil, nil
}

func (r *RegistrationDB) AddRegistration(k Registration) { 
    r.Lock()
    defer r.Unlock()
    _, ok := r.registrationMap[k]
    if !ok {
        r.registrationMap[k] = Producers{}
    }
}

到这里nsqlookupd 的主要操作:登记topic和查询topic的代码都介绍了一下,比较简单,并且没有持久化,他的持久化其实是借用的nsqd的持久化,因为所有nsqd启动的时候,都会立即链接nsqlookupd,然后注册对应的topic, 所以nsqlookupd 没必要进行持久化操作了。

另外在实际应用中,nsqlookupd可以多设置几个也是支持的,nsqd可以连接多个nsqlookupd服务,同时注册,灵活性比较高。
nsqlookupd 代码算是最简单的web服务的样子,简单的每个client一个go协程进行处理,内存map记录topic->nsqd 的映射关系,http提供查询,增删改操作。

Share
分类: GO, nsqd, nsqd 标签: , ,
  1. 本文目前尚无任何评论.
  1. 本文目前尚无任何 trackbacks 和 pingbacks.

注意: 评论者允许使用'@user空格'的方式将自己的评论通知另外评论者。例如, ABC是本文的评论者之一,则使用'@ABC '(不包括单引号)将会自动将您的评论发送给ABC。使用'@all ',将会将评论发送给之前所有其它评论者。请务必注意user必须和评论者名相匹配(大小写一致)。