首页 > GO, nsqd, nsqd > nsqlookupd 源码分析(1)- 启动服务

nsqlookupd 源码分析(1)- 启动服务

2018年6月4日 发表评论 阅读评论 5568次阅读    

之前的文章写了nsqd的代码逻辑,这里简单介绍一下作为负载均衡或者消息topic/channel汇总的nsqlookupd 的实现原理。相对来说nsqlookupd 比nsqd简单很多,不需要持久化,不需要太多的携程,也没有那么多channel。因此这里简单介绍一下。

nsqlookupd作为其他nsqd的中心,主要负责topic的等级注册,以及channel的登记注册,topic位置查询等服务。生产者一般可以不用连接nsqlookupd,但是消费者在消费时,如果是分布式环境就得连接nsqlookupd,查询指定topic在哪些机器上,进而连接对应的机器,SUB上去消费内容。这里类似redis的集群方案,只是redis集群是纯分散的,而nsqd则是把这个任务交给了nsqlookupd, 这样有利有弊,设计结构复杂一些,但是nsqlookupd的存在让服务维护容易,并且模块清晰一些。


下面分3个方面开始介绍:

  1. 启动服务;
  2. TCP协议订阅/取消TOPIC;
  3. HTTP协议查询/创建topic服务;

这里简单写一下 启动服务的流程,其实相对nsqd来说很简单。

启动服务

跟nsqd类似,启动服务都是在apps/nsqlookupd/nsqlookupd.go里面完成的,同样,外面包了一层go-svc,具体在“nsqd 源码分析(1)- 启动服务” 里面有介绍。最后框架代码会调用program.Start()函数, 后者主要技术加载配置文件,加载命令行参数,nsqlookupd 参数非常简单:tcp-address, http-address, broadcast-address,以及日志级别的东西。
之后就创建NSQLookupd结构,调用其 Main 函数,之后就会返回到框架代码进行等待信号量,这里的信号量是指操作系统级别的kill等信号,一旦捕捉到退出信号,就会调用program.Stop()函数进行优雅退出。

func (p *program) Start() error {
    opts := nsqlookupd.NewOptions()
    //other
    options.Resolve(opts, flagSet, cfg)
    daemon := nsqlookupd.New(opts)
    //调用业务的main函数,Main函数会创建协程进行处理, 立即返回
    daemon.Main()
    p.nsqlookupd = daemon
    return nil
}

nsqlookupd.New(opts) 也很简单,就是初始化了一个NSQLookupd结构,设置好日志级别。稍微主要的是NSQLookupd.DB结构,这是一个记录所有topic、channel以及其对应的nsqd的结构,大概来看一下:

registrationMap就是一个topic/channel 映射到 Producers也就是nsqd的map。代码如下:

func New(opts *Options) *NSQLookupd {
    //初始化日志,new一个NSQLookupd返回,没干别的 
    n := &NSQLookupd{
        opts: opts,
        DB:   NewRegistrationDB()
    }
}

NewRegistrationDB() 返回一个RegistrationDB 结构,大概的数据结构类型画图如下如下:

type RegistrationDB struct {
    sync.RWMutex
    registrationMap map[Registration]Producers  //map, 记录topic、channel 对应的 所有nsqd服务器信息
}

type Registration struct {
    Category string
    Key      string
    SubKey   string
}
type Registrations []Registration  //这是一个topic/channel的数组

type PeerInfo struct {
    lastUpdate       int64
    id               string
    RemoteAddress    string `json:"remote_address"`
    Hostname         string `json:"hostname"`
    BroadcastAddress string `json:"broadcast_address"`
    TCPPort          int    `json:"tcp_port"`
    HTTPPort         int    `json:"http_port"`
    Version          string `json:"version"`
}

type Producer struct {
    peerInfo     *PeerInfo   //nsqd服务器网络信息,端口等
    tombstoned   bool
    tombstonedAt time.Time
}

接下来就是Main函数了,做的事情也比较简单,Maint函数启动了TCP协议和HTTP协议的协程进行处理。

首先打开TCP协议端口

老办法,用waitGroup 开启协程,调用 protocol.TCPServer(), 传入处理类是 tcpServer,实际上调用的LookupProtocolV1::IOLoop函数,具体后面的章节在讲。

func (l *NSQLookupd) Main() {
    //apps/nsqlookupd/nsqlookupd.go 调用这里
    ctx := &Context{l}
    tcpListener, err := net.Listen("tcp", l.opts.TCPAddress)
    l.Lock()
    l.tcpListener = tcpListener
    l.Unlock()
    //tcp协议处理函数其实是LookupProtocolV1::IOLoop,  支持IDENTIFY, REGISTER, UNREGISTER 操作
    tcpServer := &tcpServer{ctx: ctx}
    l.waitGroup.Wrap(func() {
        protocol.TCPServer(tcpListener, tcpServer, l.logf)
    })

tcpServer.Handle 函数实现如下。

func (p *tcpServer) Handle(clientConn net.Conn) {
    //如果有客户端连接,上层会创建一个协程,调用这里.
    //本函数判断协议版本,之后就基本就调用了LookupProtocolV1::IOLoop
        switch protocolMagic {
    case "  V1":
        prot = &LookupProtocolV1{ctx: p.ctx}

    err = prot.IOLoop(clientConn)
    if err != nil {
        p.ctx.nsqlookupd.logf(LOG_ERROR, "client(%s) - %s", clientConn.RemoteAddr(), err)
        return
    }
}
启动HTTP协议端口

http协议类似,设置方法也差不多,不过使用的是newHTTPServer()生成的一个handle类,以及使用 http_api 包。

    httpListener, err := net.Listen("tcp", l.opts.HTTPAddress)
    if err != nil {
        l.logf(LOG_FATAL, "listen (%s) failed - %s", l.opts.HTTPAddress, err)
        os.Exit(1)
    }
    l.Lock()
    l.httpListener = httpListener
    l.Unlock()
    //http协议要复杂很多
    httpServer := newHTTPServer(ctx)
    l.waitGroup.Wrap(func() {
        http_api.Serve(httpListener, httpServer, "HTTP", l.logf)
    })
}   

服务tcp,http端口打开后就可以接受客户端连接,进行topic注册和注销的操作,以及查询请求了。后面的文章在简单讲一下TCP协议和HTTP协议的相关处理。

Share
分类: GO, nsqd, nsqd 标签: , ,

注意: 评论者允许使用'@user空格'的方式将自己的评论通知另外评论者。例如, ABC是本文的评论者之一,则使用'@ABC '(不包括单引号)将会自动将您的评论发送给ABC。使用'@all ',将会将评论发送给之前所有其它评论者。请务必注意user必须和评论者名相匹配(大小写一致)。