nsqd 源码分析(1)- 启动服务
前段时间了解Go语言,开始了解Go的优秀开源服务,所以找出NSQ学习一下,下载了一份nsq源码开始学习,顺便记录一下阅读的笔记。
对协程实现原理感兴趣的话推荐看一看libtask,几年前在看源码的时候写过2篇笔记,里面有代码和注释:libtask协程库实现源码学习
NSQ是一个实时分布式消息处理服务,支持分布式,其源码包括:
1. nsqd 作为一个主要的消息接受,发送, 跟客户端沟通的后台进程,负责监听客户端连接,处理客户端的发送接收请求;
2. nsqlookupd 是一个管理进程,其他所有nsqd启动会尝试不断连接他,上报topic信息等;
3. nsqadmin 管理进程;
4. 其他便利工具;
nsq 代码目录如下:
. ├── apps │ ├── nsqadmin │ ├── nsqd 主要服务,监听请求服务客户端 │ ├── nsqlookupd │ └── 。。。 ├── bench ├── build ├── contrib ├── internal │ ├── app │ ├── clusterinfo │ ├── dirlock │ ├── http_api │ ├── protocol 协议处理库,包括http, tcp等 │ ├── util ├── nsqadmin │ ├── static │ └── test ├── nsqd │ └── test └── nsqlookupd
程序main函数其实是在appapps/nsqd/nsqd.go, 这里经过一层包装后,实际的业务处理代码还是在nsqd/nsqd.go 进行处理。前者只是利用go-svc 进行了一层服务包装而已。
一、main函数入口
nsqd程序入口在 apps/nsqd/nsqd.go: main, 代码非常简单:
func main() { prg := &program{} if err := svc.Run(prg, syscall.SIGINT, syscall.SIGTERM); err != nil { log.Fatal(err) } }
其实是用了一个开源包装库:github.com/judwhite/go-svc/svc , svc.Run函数非常简单,就是一个包装,先后调用传入参数service的service.Init(env); service.Start(); service.Stop()
, 同事注册了信号处理函数,方便程序优雅退出。最重要的函数就是program.Start了,
值得注意的是,Run 函数后面会创建一个signalChan 信号量,绑定SIGINT, SIGTEM命令,这样service.Start()运行结束后,会阻塞在<-signalChan 上面,直到程序进入退出逻辑后,才会运行service.Stop() 进行资源释放,优雅退出。
func Run(service Service, sig ...os.Signal) error { env := environment{} if err := service.Init(env); err != nil { return err } if err := service.Start(); err != nil { return err } if len(sig) == 0 { sig = []os.Signal{syscall.SIGINT, syscall.SIGTERM} } //下面创建信号监听函数,用信号管道signalChan 监听,绑定SIGINT, SIGTEM命令, //这样service.Start()运行结束后,会阻塞在<-signalChan 上面,直到程序进入退出逻辑后, //才会运行service.Stop() 进行资源释放,优雅退出 signalChan := make(chan os.Signal, 1) signalNotify(signalChan, sig...) //主协程会阻塞在这里 <-signalChan return service.Stop() }
二、程序启动准备函数program.Start
program.Start 开始解析命令行参数,进行格式校验,然后调用nsqd.LoadMetadata() 加载topics数据,随后nsqd.PersistMetadata() 持久化数据,最后调用nsqd.Main() 开始监听服务,后者运行后会创建相关的协程进行监听请求。
func (p *program) Start() error { //解析配置,默认 < 配置文件 <命令行配置, 最后使用options包合并配置到cfg opts := nsqd.NewOptions() flagSet := nsqdFlagSet(opts) flagSet.Parse(os.Args[1:]) configFile := flagSet.Lookup("config").Value.String() cfg.Validate() options.Resolve(opts, flagSet, cfg) nsqd := nsqd.New(opts)//调用new函数创建一个nsqd结构,传入上面解析的配置信息. 初始化NSQD结构,加锁数据目录,初始化https配置 err := nsqd.LoadMetadata()//加载数据, 初始化n.topics结构 //持久化当前的topic,channel 数据结构,不涉及到数据不封顶持久化. 写入临时文件后改名 //怎么刚刚启动就要持久化呢?原因是? 搞回滚用? 清理之前的回滚信息? 比如之前有失败执行的,后来改了要求的 err = nsqd.PersistMetadata() nsqd.Main()//开始监听服务 p.nsqd = nsqd //start返回后会进入到svc的代码里面进行等待,监听信号量如果用户杀进程,就调用下面的stop return nil }
上面关于数据加载,有2个类似的函数:
1. LoadMetadata
这函数用来读取当前的nsqd.dat文件内容,nsq对于配置文件的写入,都是用先写临时文件然后进行rename,所以在配置加载的时候也需要读取2个文件,按进行比较, 看newMetadataFile和oldMetadataFile是否一样,如果不一样就报错。 随后json.Unmarshal(data, &m) 格式化json文件内容,循环Topics然后递归扫描其Channels列表。
LoadMetadata 在扫描topic和channel的时候,分别会调用 GetTopic, GetChannel, 其实这两个函数如果在判断topic不存在的时候,会创建他,并且跟lookupd进行联系,这里就不细化了。
2. PersistMetadata
持久化当前的topic,channel 数据结构,不涉及到数据不封顶持久化. 写入临时文件后改名, 最后的文件就是nsqd.data。文件比较长,主要是为了保证操作安全,做尽量保证原值操作的,函数会写了2次文件,第一次是json 数据文件,写好后重命名。 先写入临时文件,然后做一次重命名(os.Rename),这样避免中间出问题只写了一部分数据,rename是原子操作,所以安全,避免不一致性的发生。
开始监听请求 nsqd.Main()
program.Run 函数运行后会调用nsqd.Main(), 此时配置以及解析完成,topic存储文件也加载好了,因此接下来可以开始监听客户端请求了, 函数之后还好启动其他协程进行队列扫描,连接lookupd循环等。
打开TCP端口监听请求
我们知道go的tcp监听函数很简单,就是net.Listen() 后开始accept就可以了。nsqd稍微做了一点包装,用protocol.TCPServer 进行了一次封装,先看一下Maint 头部代码:
func (n *NSQD) Main() { //开启各个端口监听客户端请求,创建各项后台携程进行处理 var httpListener net.Listener var httpsListener net.Listener //初始化一个context的指针,里面就 ctx := &context{n} //创建TCP协议的监听句柄监听请求 tcpListener, err := net.Listen("tcp", n.getOpts().TCPAddress) n.Lock() n.tcpListener = tcpListener n.Unlock() //tcpServer是个接口,里面有个context的指针,之外就是Handle函数了,函数读取协议版本然后调用prot.IOLoop进行消息读取和解析 //处理流程在internal/protocol/tcp_server.go tcpServer := &tcpServer{ctx: ctx} n.waitGroup.Wrap(func() { //循环accept客户端请求,然后创建协程进行消息读写循环prot.IOLoop(clientConn) protocol.TCPServer(n.tcpListener, tcpServer, n.logf) })
函数首先开始处理TCP端口的协议部分,调用net.Listen 开启端口后,将句柄交给了接口protocol.TCPServer 进行处理, 传入 tcpServer 参数。而前者 protocol.TCPServer 是一个封装函数,在多个地方会用到,相当于tcpServer 是一个自定义协议的适配类。
异步处理每一个端口的监听请求
上面可以看到,在进行accept之前,是用n.waitGroup.Wrap()进行封装的,这其实是一个简单的封装函数,将function参数作为一个协程进行异步处理。也就是说,nsqd对于每一个监听端口,都创建一个协程进行循环accept。
下面来看一下protocol.TCPServer的代码,TCPServer 用来进行accept然后创立一个go协程调用handler的Handle函数处理后面的逻辑, 参数Listener是一个监听的net.Listen类,TCPHandler 是一个tcpServer 类,用来处理上层具体业务逻辑的。所以其实TCPServer 就是一个listener.Accept()的粉钻,接受客户端连接后,进行错误检查,然后将连接抛给参数hander的Handle函数去处理。
//Listener是一个监听的net.Listen类,TCPHandler 是一个tcpServer 类,用来处理事件的 //TCPServer 用来进行accept然后就创立一个go协程调用handler的Handle函数处理后面的逻辑 func TCPServer(listener net.Listener, handler TCPHandler, logf lg.AppLogFunc) { logf(lg.INFO, "TCP: listening on %s", listener.Addr()) for { //等待接受一个客户端连接 clientConn, err := listener.Accept() //在协程中处理这一个客户端请求, 直至结束连接 go handler.Handle(clientConn) } logf(lg.INFO, "TCP: closing %s", listener.Addr()) }
由上面的handler.Handle(clientConn) 可以知道,nsqd是每接受一个客户端连接,就立即创建一个协程进行处理请求,对于TCP协议的情况,实际上是nsqd/tcp.go 的 tcpServer.Handle 接口进行处理的, 具体后面在详细介绍。
打开HTTP端口监听请求
这部分跟TCP类似,这里暂时不赘述了。
总之,nsqd创建协程异步 简单listen后就调用protocol.TCPServer 开始accept客户端连接,创建客户端连接后 立即创建一个go协程去处理这一个客户端的请求,非常简单,代码很清晰,比C语言程序简单多了。
创建辅助协程异步处理管理事务
main后面会创建下面几个协程:
1. 队列scan扫描协程;
2. lookup的查找协程;
3. 内部状态处理协程;
main结束后返回到program.Start, 后者退出后返回到svc的代码里面进行等待,监听信号量如果用户杀进程, 就调用program.Stop函数。
//队列scan扫描协程 n.waitGroup.Wrap(func() { n.queueScanLoop() }) //lookup的查找协程 n.waitGroup.Wrap(func() { n.lookupLoop() }) //如果配置了状态地址,开启状态协程 if n.getOpts().StatsdAddress != "" { n.waitGroup.Wrap(func() { n.statsdLoop() }) } //至此main函数结束,上面基本上开启了tcp,https,http协程开始不断accept客户端请求并且进行处理 //stop函数实际上调用到了p.nsqd.Exit() }
到这里, nsqd.Maint函数会直接返回,在这期间会创建各个业务处理协程进行处理, 最后到框架代码里开始阻塞,直到程序收到SIGINT等信号后进行优雅退出处理。
这就是协程的好处,随处创建,代价很低,代码逻辑变得非常清晰。不过程序员需要关注代码在什么上下文环境中运行的,这样才能真正理解到go语言的优势。
到此程序启动完毕,接下来就是监听端口处理请求,以及channel消息的处理了。后面的文章在详细写。
最后再推荐一下libtask源码,应该算协程实现原理最简单,最好的代码示例了。
代码中部分特殊符号被转码了,影响阅读