nsqlookupd 源码分析(1)- 启动服务
之前的文章写了nsqd的代码逻辑,这里简单介绍一下作为负载均衡或者消息topic/channel汇总的nsqlookupd 的实现原理。相对来说nsqlookupd 比nsqd简单很多,不需要持久化,不需要太多的携程,也没有那么多channel。因此这里简单介绍一下。
nsqlookupd作为其他nsqd的中心,主要负责topic的等级注册,以及channel的登记注册,topic位置查询等服务。生产者一般可以不用连接nsqlookupd,但是消费者在消费时,如果是分布式环境就得连接nsqlookupd,查询指定topic在哪些机器上,进而连接对应的机器,SUB上去消费内容。这里类似redis的集群方案,只是redis集群是纯分散的,而nsqd则是把这个任务交给了nsqlookupd, 这样有利有弊,设计结构复杂一些,但是nsqlookupd的存在让服务维护容易,并且模块清晰一些。
下面分3个方面开始介绍:
- 启动服务;
- TCP协议订阅/取消TOPIC;
- HTTP协议查询/创建topic服务;
这里简单写一下 启动服务的流程,其实相对nsqd来说很简单。
启动服务
跟nsqd类似,启动服务都是在apps/nsqlookupd/nsqlookupd.go里面完成的,同样,外面包了一层go-svc,具体在“nsqd 源码分析(1)- 启动服务” 里面有介绍。最后框架代码会调用program.Start()函数, 后者主要技术加载配置文件,加载命令行参数,nsqlookupd 参数非常简单:tcp-address, http-address, broadcast-address,以及日志级别的东西。
之后就创建NSQLookupd结构,调用其 Main 函数,之后就会返回到框架代码进行等待信号量,这里的信号量是指操作系统级别的kill等信号,一旦捕捉到退出信号,就会调用program.Stop()函数进行优雅退出。
func (p *program) Start() error {
opts := nsqlookupd.NewOptions()
//other
options.Resolve(opts, flagSet, cfg)
daemon := nsqlookupd.New(opts)
//调用业务的main函数,Main函数会创建协程进行处理, 立即返回
daemon.Main()
p.nsqlookupd = daemon
return nil
}
nsqlookupd.New(opts) 也很简单,就是初始化了一个NSQLookupd结构,设置好日志级别。稍微主要的是NSQLookupd.DB结构,这是一个记录所有topic、channel以及其对应的nsqd的结构,大概来看一下:

registrationMap就是一个topic/channel 映射到 Producers也就是nsqd的map。代码如下:
func New(opts *Options) *NSQLookupd {
//初始化日志,new一个NSQLookupd返回,没干别的
n := &NSQLookupd{
opts: opts,
DB: NewRegistrationDB()
}
}
NewRegistrationDB() 返回一个RegistrationDB 结构,大概的数据结构类型画图如下如下:
type RegistrationDB struct {
sync.RWMutex
registrationMap map[Registration]Producers //map, 记录topic、channel 对应的 所有nsqd服务器信息
}
type Registration struct {
Category string
Key string
SubKey string
}
type Registrations []Registration //这是一个topic/channel的数组
type PeerInfo struct {
lastUpdate int64
id string
RemoteAddress string `json:"remote_address"`
Hostname string `json:"hostname"`
BroadcastAddress string `json:"broadcast_address"`
TCPPort int `json:"tcp_port"`
HTTPPort int `json:"http_port"`
Version string `json:"version"`
}
type Producer struct {
peerInfo *PeerInfo //nsqd服务器网络信息,端口等
tombstoned bool
tombstonedAt time.Time
}
接下来就是Main函数了,做的事情也比较简单,Maint函数启动了TCP协议和HTTP协议的协程进行处理。
首先打开TCP协议端口
老办法,用waitGroup 开启协程,调用 protocol.TCPServer(), 传入处理类是 tcpServer,实际上调用的LookupProtocolV1::IOLoop函数,具体后面的章节在讲。
func (l *NSQLookupd) Main() {
//apps/nsqlookupd/nsqlookupd.go 调用这里
ctx := &Context{l}
tcpListener, err := net.Listen("tcp", l.opts.TCPAddress)
l.Lock()
l.tcpListener = tcpListener
l.Unlock()
//tcp协议处理函数其实是LookupProtocolV1::IOLoop, 支持IDENTIFY, REGISTER, UNREGISTER 操作
tcpServer := &tcpServer{ctx: ctx}
l.waitGroup.Wrap(func() {
protocol.TCPServer(tcpListener, tcpServer, l.logf)
})
tcpServer.Handle 函数实现如下。
func (p *tcpServer) Handle(clientConn net.Conn) {
//如果有客户端连接,上层会创建一个协程,调用这里.
//本函数判断协议版本,之后就基本就调用了LookupProtocolV1::IOLoop
switch protocolMagic {
case " V1":
prot = &LookupProtocolV1{ctx: p.ctx}
err = prot.IOLoop(clientConn)
if err != nil {
p.ctx.nsqlookupd.logf(LOG_ERROR, "client(%s) - %s", clientConn.RemoteAddr(), err)
return
}
}
启动HTTP协议端口
http协议类似,设置方法也差不多,不过使用的是newHTTPServer()生成的一个handle类,以及使用 http_api 包。
httpListener, err := net.Listen("tcp", l.opts.HTTPAddress)
if err != nil {
l.logf(LOG_FATAL, "listen (%s) failed - %s", l.opts.HTTPAddress, err)
os.Exit(1)
}
l.Lock()
l.httpListener = httpListener
l.Unlock()
//http协议要复杂很多
httpServer := newHTTPServer(ctx)
l.waitGroup.Wrap(func() {
http_api.Serve(httpListener, httpServer, "HTTP", l.logf)
})
}
服务tcp,http端口打开后就可以接受客户端连接,进行topic注册和注销的操作,以及查询请求了。后面的文章在简单讲一下TCP协议和HTTP协议的相关处理。

近期评论