首页 > C/C++, GO, nsqd, nsqd > nsqd 源码分析(1)- 启动服务

nsqd 源码分析(1)- 启动服务

2018年5月19日 发表评论 阅读评论 29630次阅读    

前段时间了解Go语言,开始了解Go的优秀开源服务,所以找出NSQ学习一下,下载了一份nsq源码开始学习,顺便记录一下阅读的笔记。

对协程实现原理感兴趣的话推荐看一看libtask,几年前在看源码的时候写过2篇笔记,里面有代码和注释:libtask协程库实现源码学习

NSQ是一个实时分布式消息处理服务,支持分布式,其源码包括:
1. nsqd 作为一个主要的消息接受,发送, 跟客户端沟通的后台进程,负责监听客户端连接,处理客户端的发送接收请求;
2. nsqlookupd 是一个管理进程,其他所有nsqd启动会尝试不断连接他,上报topic信息等;
3. nsqadmin 管理进程;
4. 其他便利工具;


nsq 代码目录如下:

.
├── apps
│   ├── nsqadmin
│   ├── nsqd  主要服务,监听请求服务客户端
│   ├── nsqlookupd
│   └── 。。。
├── bench
├── build
├── contrib
├── internal
│   ├── app
│   ├── clusterinfo
│   ├── dirlock
│   ├── http_api
│   ├── protocol  协议处理库,包括http, tcp等
│   ├── util
├── nsqadmin
│   ├── static
│   └── test
├── nsqd
│   └── test
└── nsqlookupd

程序main函数其实是在appapps/nsqd/nsqd.go, 这里经过一层包装后,实际的业务处理代码还是在nsqd/nsqd.go 进行处理。前者只是利用go-svc 进行了一层服务包装而已。

一、main函数入口

nsqd程序入口在 apps/nsqd/nsqd.go: main, 代码非常简单:

func main() {
    prg := &program{}
    if err := svc.Run(prg, syscall.SIGINT, syscall.SIGTERM); err != nil {
        log.Fatal(err)
    }
}

其实是用了一个开源包装库:github.com/judwhite/go-svc/svc , svc.Run函数非常简单,就是一个包装,先后调用传入参数service的service.Init(env); service.Start(); service.Stop() , 同事注册了信号处理函数,方便程序优雅退出。最重要的函数就是program.Start了,
值得注意的是,Run 函数后面会创建一个signalChan 信号量,绑定SIGINT, SIGTEM命令,这样service.Start()运行结束后,会阻塞在<-signalChan 上面,直到程序进入退出逻辑后,才会运行service.Stop() 进行资源释放,优雅退出。

func Run(service Service, sig ...os.Signal) error {
    env := environment{}
    if err := service.Init(env); err != nil {
        return err 
    }   

    if err := service.Start(); err != nil {
        return err 
    }   

    if len(sig) == 0 { 
        sig = []os.Signal{syscall.SIGINT, syscall.SIGTERM}
    }   

    //下面创建信号监听函数,用信号管道signalChan 监听,绑定SIGINT, SIGTEM命令,
    //这样service.Start()运行结束后,会阻塞在&lt;-signalChan 上面,直到程序进入退出逻辑后,
    //才会运行service.Stop() 进行资源释放,优雅退出
    signalChan := make(chan os.Signal, 1)
    signalNotify(signalChan, sig...)
    //主协程会阻塞在这里
    &lt;-signalChan

    return service.Stop()
}

二、程序启动准备函数program.Start

program.Start 开始解析命令行参数,进行格式校验,然后调用nsqd.LoadMetadata() 加载topics数据,随后nsqd.PersistMetadata() 持久化数据,最后调用nsqd.Main() 开始监听服务,后者运行后会创建相关的协程进行监听请求。

func (p *program) Start() error {
    //解析配置,默认 &lt; 配置文件 &lt;命令行配置, 最后使用options包合并配置到cfg
    opts := nsqd.NewOptions()
    flagSet := nsqdFlagSet(opts)
    flagSet.Parse(os.Args[1:])
    configFile := flagSet.Lookup(&quot;config&quot;).Value.String()
    cfg.Validate()

    options.Resolve(opts, flagSet, cfg)
    nsqd := nsqd.New(opts)//调用new函数创建一个nsqd结构,传入上面解析的配置信息. 初始化NSQD结构,加锁数据目录,初始化https配置

    err := nsqd.LoadMetadata()//加载数据, 初始化n.topics结构

    //持久化当前的topic,channel 数据结构,不涉及到数据不封顶持久化. 写入临时文件后改名
    //怎么刚刚启动就要持久化呢?原因是? 搞回滚用? 清理之前的回滚信息? 比如之前有失败执行的,后来改了要求的
    err = nsqd.PersistMetadata()
    nsqd.Main()//开始监听服务

    p.nsqd = nsqd
    //start返回后会进入到svc的代码里面进行等待,监听信号量如果用户杀进程,就调用下面的stop
    return nil
}

上面关于数据加载,有2个类似的函数:

1. LoadMetadata

这函数用来读取当前的nsqd.dat文件内容,nsq对于配置文件的写入,都是用先写临时文件然后进行rename,所以在配置加载的时候也需要读取2个文件,按进行比较, 看newMetadataFile和oldMetadataFile是否一样,如果不一样就报错。 随后json.Unmarshal(data, &m) 格式化json文件内容,循环Topics然后递归扫描其Channels列表。
LoadMetadata 在扫描topic和channel的时候,分别会调用 GetTopic, GetChannel, 其实这两个函数如果在判断topic不存在的时候,会创建他,并且跟lookupd进行联系,这里就不细化了。

2. PersistMetadata

持久化当前的topic,channel 数据结构,不涉及到数据不封顶持久化. 写入临时文件后改名, 最后的文件就是nsqd.data。文件比较长,主要是为了保证操作安全,做尽量保证原值操作的,函数会写了2次文件,第一次是json 数据文件,写好后重命名。 先写入临时文件,然后做一次重命名(os.Rename),这样避免中间出问题只写了一部分数据,rename是原子操作,所以安全,避免不一致性的发生。

开始监听请求 nsqd.Main()

program.Run 函数运行后会调用nsqd.Main(), 此时配置以及解析完成,topic存储文件也加载好了,因此接下来可以开始监听客户端请求了, 函数之后还好启动其他协程进行队列扫描,连接lookupd循环等。

打开TCP端口监听请求

我们知道go的tcp监听函数很简单,就是net.Listen() 后开始accept就可以了。nsqd稍微做了一点包装,用protocol.TCPServer 进行了一次封装,先看一下Maint 头部代码:

func (n *NSQD) Main() {
    //开启各个端口监听客户端请求,创建各项后台携程进行处理
    var httpListener net.Listener
    var httpsListener net.Listener
    //初始化一个context的指针,里面就
    ctx := &amp;context{n}
    //创建TCP协议的监听句柄监听请求
    tcpListener, err := net.Listen(&quot;tcp&quot;, n.getOpts().TCPAddress)
    n.Lock()
    n.tcpListener = tcpListener
    n.Unlock()
    //tcpServer是个接口,里面有个context的指针,之外就是Handle函数了,函数读取协议版本然后调用prot.IOLoop进行消息读取和解析
    //处理流程在internal/protocol/tcp_server.go 
    tcpServer := &amp;tcpServer{ctx: ctx}
    n.waitGroup.Wrap(func() {
        //循环accept客户端请求,然后创建协程进行消息读写循环prot.IOLoop(clientConn)
        protocol.TCPServer(n.tcpListener, tcpServer, n.logf)
    })

函数首先开始处理TCP端口的协议部分,调用net.Listen 开启端口后,将句柄交给了接口protocol.TCPServer 进行处理, 传入 tcpServer 参数。而前者 protocol.TCPServer 是一个封装函数,在多个地方会用到,相当于tcpServer 是一个自定义协议的适配类。

异步处理每一个端口的监听请求

上面可以看到,在进行accept之前,是用n.waitGroup.Wrap()进行封装的,这其实是一个简单的封装函数,将function参数作为一个协程进行异步处理。也就是说,nsqd对于每一个监听端口,都创建一个协程进行循环accept。

下面来看一下protocol.TCPServer的代码,TCPServer 用来进行accept然后创立一个go协程调用handler的Handle函数处理后面的逻辑, 参数Listener是一个监听的net.Listen类,TCPHandler 是一个tcpServer 类,用来处理上层具体业务逻辑的。所以其实TCPServer 就是一个listener.Accept()的粉钻,接受客户端连接后,进行错误检查,然后将连接抛给参数hander的Handle函数去处理。

//Listener是一个监听的net.Listen类,TCPHandler 是一个tcpServer 类,用来处理事件的
//TCPServer 用来进行accept然后就创立一个go协程调用handler的Handle函数处理后面的逻辑
func TCPServer(listener net.Listener, handler TCPHandler, logf lg.AppLogFunc) {
    logf(lg.INFO, &quot;TCP: listening on %s&quot;, listener.Addr())
    for {
        //等待接受一个客户端连接
        clientConn, err := listener.Accept()
        //在协程中处理这一个客户端请求, 直至结束连接
        go handler.Handle(clientConn)
    }
    logf(lg.INFO, &quot;TCP: closing %s&quot;, listener.Addr())
}

由上面的handler.Handle(clientConn) 可以知道,nsqd是每接受一个客户端连接,就立即创建一个协程进行处理请求,对于TCP协议的情况,实际上是nsqd/tcp.go 的 tcpServer.Handle 接口进行处理的, 具体后面在详细介绍。

打开HTTP端口监听请求

这部分跟TCP类似,这里暂时不赘述了。

总之,nsqd创建协程异步 简单listen后就调用protocol.TCPServer 开始accept客户端连接,创建客户端连接后 立即创建一个go协程去处理这一个客户端的请求,非常简单,代码很清晰,比C语言程序简单多了。

创建辅助协程异步处理管理事务

main后面会创建下面几个协程:
1. 队列scan扫描协程;
2. lookup的查找协程;
3. 内部状态处理协程;

main结束后返回到program.Start, 后者退出后返回到svc的代码里面进行等待,监听信号量如果用户杀进程, 就调用program.Stop函数。

    //队列scan扫描协程
    n.waitGroup.Wrap(func() { n.queueScanLoop() })
    //lookup的查找协程
    n.waitGroup.Wrap(func() { n.lookupLoop() })
    //如果配置了状态地址,开启状态协程                                                                                           
    if n.getOpts().StatsdAddress != &quot;&quot; {
        n.waitGroup.Wrap(func() { n.statsdLoop() })
    }
    //至此main函数结束,上面基本上开启了tcp,https,http协程开始不断accept客户端请求并且进行处理
    //stop函数实际上调用到了p.nsqd.Exit()
}

到这里, nsqd.Maint函数会直接返回,在这期间会创建各个业务处理协程进行处理, 最后到框架代码里开始阻塞,直到程序收到SIGINT等信号后进行优雅退出处理。
这就是协程的好处,随处创建,代价很低,代码逻辑变得非常清晰。不过程序员需要关注代码在什么上下文环境中运行的,这样才能真正理解到go语言的优势。
到此程序启动完毕,接下来就是监听端口处理请求,以及channel消息的处理了。后面的文章在详细写。

最后再推荐一下libtask源码,应该算协程实现原理最简单,最好的代码示例了。

Share
分类: C/C++, GO, nsqd, nsqd 标签: , , ,
  1. kaka
    2019年7月24日15:07 | #1

    代码中部分特殊符号被转码了,影响阅读

  1. 2018年6月5日00:14 | #1

注意: 评论者允许使用'@user空格'的方式将自己的评论通知另外评论者。例如, ABC是本文的评论者之一,则使用'@ABC '(不包括单引号)将会自动将您的评论发送给ABC。使用'@all ',将会将评论发送给之前所有其它评论者。请务必注意user必须和评论者名相匹配(大小写一致)。