nsqlookupd 源码分析(1)- 启动服务
之前的文章写了nsqd的代码逻辑,这里简单介绍一下作为负载均衡或者消息topic/channel汇总的nsqlookupd 的实现原理。相对来说nsqlookupd 比nsqd简单很多,不需要持久化,不需要太多的携程,也没有那么多channel。因此这里简单介绍一下。
nsqlookupd作为其他nsqd的中心,主要负责topic的等级注册,以及channel的登记注册,topic位置查询等服务。生产者一般可以不用连接nsqlookupd,但是消费者在消费时,如果是分布式环境就得连接nsqlookupd,查询指定topic在哪些机器上,进而连接对应的机器,SUB上去消费内容。这里类似redis的集群方案,只是redis集群是纯分散的,而nsqd则是把这个任务交给了nsqlookupd, 这样有利有弊,设计结构复杂一些,但是nsqlookupd的存在让服务维护容易,并且模块清晰一些。
下面分3个方面开始介绍:
- 启动服务;
- TCP协议订阅/取消TOPIC;
- HTTP协议查询/创建topic服务;
这里简单写一下 启动服务的流程,其实相对nsqd来说很简单。
启动服务
跟nsqd类似,启动服务都是在apps/nsqlookupd/nsqlookupd.go里面完成的,同样,外面包了一层go-svc,具体在“nsqd 源码分析(1)- 启动服务” 里面有介绍。最后框架代码会调用program.Start()函数, 后者主要技术加载配置文件,加载命令行参数,nsqlookupd 参数非常简单:tcp-address, http-address, broadcast-address,以及日志级别的东西。
之后就创建NSQLookupd结构,调用其 Main 函数,之后就会返回到框架代码进行等待信号量,这里的信号量是指操作系统级别的kill等信号,一旦捕捉到退出信号,就会调用program.Stop()函数进行优雅退出。
func (p *program) Start() error { opts := nsqlookupd.NewOptions() //other options.Resolve(opts, flagSet, cfg) daemon := nsqlookupd.New(opts) //调用业务的main函数,Main函数会创建协程进行处理, 立即返回 daemon.Main() p.nsqlookupd = daemon return nil }
nsqlookupd.New(opts) 也很简单,就是初始化了一个NSQLookupd结构,设置好日志级别。稍微主要的是NSQLookupd.DB结构,这是一个记录所有topic、channel以及其对应的nsqd的结构,大概来看一下:
registrationMap就是一个topic/channel 映射到 Producers也就是nsqd的map。代码如下:
func New(opts *Options) *NSQLookupd { //初始化日志,new一个NSQLookupd返回,没干别的 n := &NSQLookupd{ opts: opts, DB: NewRegistrationDB() } }
NewRegistrationDB() 返回一个RegistrationDB 结构,大概的数据结构类型画图如下如下:
type RegistrationDB struct { sync.RWMutex registrationMap map[Registration]Producers //map, 记录topic、channel 对应的 所有nsqd服务器信息 } type Registration struct { Category string Key string SubKey string } type Registrations []Registration //这是一个topic/channel的数组 type PeerInfo struct { lastUpdate int64 id string RemoteAddress string `json:"remote_address"` Hostname string `json:"hostname"` BroadcastAddress string `json:"broadcast_address"` TCPPort int `json:"tcp_port"` HTTPPort int `json:"http_port"` Version string `json:"version"` } type Producer struct { peerInfo *PeerInfo //nsqd服务器网络信息,端口等 tombstoned bool tombstonedAt time.Time }
接下来就是Main函数了,做的事情也比较简单,Maint函数启动了TCP协议和HTTP协议的协程进行处理。
首先打开TCP协议端口
老办法,用waitGroup 开启协程,调用 protocol.TCPServer(), 传入处理类是 tcpServer,实际上调用的LookupProtocolV1::IOLoop函数,具体后面的章节在讲。
func (l *NSQLookupd) Main() { //apps/nsqlookupd/nsqlookupd.go 调用这里 ctx := &Context{l} tcpListener, err := net.Listen("tcp", l.opts.TCPAddress) l.Lock() l.tcpListener = tcpListener l.Unlock() //tcp协议处理函数其实是LookupProtocolV1::IOLoop, 支持IDENTIFY, REGISTER, UNREGISTER 操作 tcpServer := &tcpServer{ctx: ctx} l.waitGroup.Wrap(func() { protocol.TCPServer(tcpListener, tcpServer, l.logf) })
tcpServer.Handle 函数实现如下。
func (p *tcpServer) Handle(clientConn net.Conn) { //如果有客户端连接,上层会创建一个协程,调用这里. //本函数判断协议版本,之后就基本就调用了LookupProtocolV1::IOLoop switch protocolMagic { case " V1": prot = &LookupProtocolV1{ctx: p.ctx} err = prot.IOLoop(clientConn) if err != nil { p.ctx.nsqlookupd.logf(LOG_ERROR, "client(%s) - %s", clientConn.RemoteAddr(), err) return } }
启动HTTP协议端口
http协议类似,设置方法也差不多,不过使用的是newHTTPServer()生成的一个handle类,以及使用 http_api 包。
httpListener, err := net.Listen("tcp", l.opts.HTTPAddress) if err != nil { l.logf(LOG_FATAL, "listen (%s) failed - %s", l.opts.HTTPAddress, err) os.Exit(1) } l.Lock() l.httpListener = httpListener l.Unlock() //http协议要复杂很多 httpServer := newHTTPServer(ctx) l.waitGroup.Wrap(func() { http_api.Serve(httpListener, httpServer, "HTTP", l.logf) }) }
服务tcp,http端口打开后就可以接受客户端连接,进行topic注册和注销的操作,以及查询请求了。后面的文章在简单讲一下TCP协议和HTTP协议的相关处理。
近期评论