geth以太坊源码分析-P2P模块基于UDP的服务发现机制原理(2)
上篇文章分析到了p2p模块是怎么通过UDP协议去测试其他节点的连通性的(geth以太坊源码分析-P2P模块基于UDP的服务发现机制原理(1))。接下来分析一下是怎么发现其他邻近节点的。
节点发现主要是findnode, neighbors消息的处理。
之前文章降到,Table.doRefresh是用来进行节点连接和邻近节点发现的,代码如下:
func (tab *Table) doRefresh(done chan struct{}) { //本函数在协程loop里面调用个,去连接我附近的所有节点, //第一步先连接我链接的所有节点,然后再去lookup其他节点如果链接成功就加入到节点列表。 defer close(done) // Load nodes from the database and insert // them. This should yield a few previously seen nodes that are // (hopefully) still alive. //真正触发连接节点,newTable的时候不会,doRefresh在协程loop里面调用,比较安全 tab.loadSeedNodes(true) // Run self lookup to discover new neighbor nodes. //根据最近的节点进行findNode查找邻近节点, 进行一次扩展, 查找其他节点的peers //首先找距离我自己最近的节点 tab.lookup(tab.self.ID, false)
第一部分我们讲过,后面部分就是调用tab.lookup进行节点发现了。lookup根据最近的节点进行findNode查找邻近节点, 进行一次扩展, 查找其他节点的peers。
节点发现
lookup 函数前面部分,先进行初始化。
func (tab *Table) lookup(targetID NodeID, refreshIfEmpty bool) []*Node { var ( target = crypto.Keccak256Hash(targetID[:]) asked = make(map[NodeID]bool) seen = make(map[NodeID]bool) reply = make(chan []*Node, alpha) pendingQueries = 0 result *nodesByDistance ) // don't query further if we hit ourself. // unlikely to happen often in practice. asked[tab.self.ID] = true for { tab.mutex.Lock() // generate initial result set //调用closest来获取距离target最近的16个节点列表 result = tab.closest(target, bucketSize) tab.mutex.Unlock() if len(result.entries) > 0 || !refreshIfEmpty { break } // The result set is empty, all nodes were dropped, refresh. // We actually wait for the refresh to complete here. The very // first query will hit this case and run the bootstrapping // logic. //完全一个节点都没有,一般不会,如果为空那么调用refresh来触发一次刷新 //实测没进来, 应该时候如果断网或者其他节点都挂掉了,那么此时没有节点连接,那么 //需要阻塞出发一次刷新,期望能够连接上掐节点,同时, refreshIfEmpty=false 只会运行一次循环 //同时,由于 loop()->doRefresh->lookup 这样的调用关系, //所以,doRefresh每次必须在协程进行调用,否则当前协程就会卡死,永远得不到管道信号 <-tab.refresh() refreshIfEmpty = false }
通过tab.closest 函数,获取某个节点附近的bucketSize个邻近节点,保存到result里面。接下来准备进行节点扩散,具体看下面的代码。
for { // ask the alpha closest nodes that we haven't asked yet //下面的代码很巧妙,用alpha 来控制每次并发协程数,避免量太大了,默认为3 //每次并发完成后,从reply管道读取一个,注意,是一个元素出来,那么,怎么控制数量的呢? //答案是 pendingQueries, 这个代表我总共有多少请求需要去问,当前正在进行的,如果没有了,就会break for i := 0; i < len(result.entries) && pendingQueries < alpha; i++ { n := result.entries[i] if !asked[n.ID] {// 没问过的 asked[n.ID] = true //fmt.Printf("asking %+v\n", n ) pendingQueries++ //开一个协程去问一下其他节点 go func() { // Find potential neighbors to bond with //下面是一个查找邻近节点的函数 , 注意到这里是用的net成员,实际上就是udp.go里面的接口了 //代码在p2p/discover/udp.go r, err := tab.net.findnode(n.ID, n.addr(), targetID) if err != nil { //记录失败数 // Bump the failure counter to detect and evacuate non-bonded entries fails := tab.db.findFails(n.ID) + 1 tab.db.updateFindFails(n.ID, fails) log.Trace("Bumping findnode failure counter", "id", n.ID, "failcount", fails) if fails >= maxFindnodeFailures { log.Trace("Too many findnode failures, dropping", "id", n.ID, "failcount", fails) tab.delete(n) } } //得到邻近节点后,连接他们,等到结果后放入reply管道里面 //如果节点链接成功,会加入到tab的节点列表的 reply <- tab.bondall(r) }() } } //如果没有正在进行的链接了,只有一种情况,result.entries都扫描完毕了,那么break, //千万不能继续进行reply了,不然会卡死的。从管道里面读取的次数,必须和正在能有的结果数相等 //这代码很妙 if pendingQueries == 0 { // we have asked all closest nodes, stop the search break } // wait for the next reply //reply管道用来获取刚才开的findnode协程的结果,结果为新连接的节点列表 //注意下面的循环是对reply的结果来说的,不是对reply来说的,每次获取一个 for _, n := range <-reply { if n != nil && !seen[n.ID] { //fmt.Printf("result.push %+v\n", n ) seen[n.ID] = true result.push(n, bucketSize) } } pendingQueries-- } //返回找到的其他节点, 此时节点以及由bondall加入到列表里面了 return result.entries }
lookup 最重要的就是上面的循环,遍历result列表的每一个节点,分别参加协程进行findnode的发送,并且得到结果后,调用table.bondall去连接节点,测试其连通性,如果OK会加入到可用节点列表。
上面代码包含了并发度控制,避免协程并发太多。
findnode邻近节点查询
findnode()函数给toid 发送一个 findnodePacket包,询问距离target 比较近的节点有哪些, 注意以太坊的P2P扩散都是查询比较近的节点, 由Table.lookup函数调用,来查找一个节点的邻近节点。
上篇文章降到了discover模块的UDP包发送和结果处理的方式,可以在findnode上再了解一下:
func (t *udp) findnode(toid NodeID, toaddr *net.UDPAddr, target NodeID) ([]*Node, error) { nodes := make([]*Node, 0, bucketSize) nreceived := 0 //设置回应回调函数,等待类型为neighborsPacket的邻近节点包,如果类型对,就执行回调请求 errc := t.pending(toid, neighborsPacket, func(r interface{}) bool { reply := r.(*neighbors) for _, rn := range reply.Nodes { nreceived++ //得到一个简单的node结构 n, err := t.nodeFromRPC(toaddr, rn) if err != nil { log.Trace("Invalid neighbor node received", "ip", rn.IP, "addr", toaddr, "err", err) continue } nodes = append(nodes, n) } return nreceived >= bucketSize }) //上面了一个管道事件,下面开始发送真正的findnode报文,然后进行等待了 t.send(toaddr, findnodePacket, &findnode{ Target: target, Expiration: uint64(time.Now().Add(expiration).Unix()), }) //收到信息后会调用上面的回调函数,然后触发管道,然后返回nodes err := <-errc return nodes, err }
函数调用pending, 第二个参数neighborsPacket 代表我需要等待对方发送neighborsPacket 报文就可以返回。当收到报文后,会调用匿名callback函数处理数据包。
后面函数调用send将findnode包发送出去。那么,对方收到findnode包后怎么处理呢?
处理findnode包的响应
我们知道,udp.readLoop会循环读取UDP包,并且调用handlePacket处理函数处理这个package, 如果是findnode包的话,那么代表对方想要查询邻近节点,就会调用到findnode.handle 函数。
findnode.handle 用来给对方回复我的邻近节点。通过调用t.closest函数来获取邻近节点,然后发给对方。
func (req *findnode) handle(t *udp, from *net.UDPAddr, fromID NodeID, mac []byte) error { //别人发我一个findnode,那么我需要告诉他我的peers了 //这个比较复杂 if expired(req.Expiration) { return errExpired } //我是否已连接上了这个来源节点, 也就是发送这个包的人 if !t.db.hasBond(fromID) { return errUnknownNode } target := crypto.Keccak256Hash(req.Target[:]) t.mutex.Lock() //找一些距离target最近的节点 closest := t.closest(target, bucketSize).entries t.mutex.Unlock()
不过这里需要注意的是,由于UDP有最大报文数限制,所以能够发送的邻近节点数目是有限的,必须做拆包发送,这也是UDP协议发包时必须注意的点。
p := neighbors{Expiration: uint64(time.Now().Add(expiration).Unix())} var sent bool // Send neighbors in chunks with at most maxNeighbors per packet // to stay below the 1280 byte limit. //扫描这些最近的节点列表,然后一个包一个包的发送给对方 for _, n := range closest { if netutil.CheckRelayIP(from.IP, n.IP) == nil { p.Nodes = append(p.Nodes, nodeToRPC(n)) } if len(p.Nodes) == maxNeighbors { //给对方发送 neighborsPacket 包,里面包含节点列表 t.send(from, neighborsPacket, &p) p.Nodes = p.Nodes[:0] sent = true } } //扫一个尾 if len(p.Nodes) > 0 || !sent { t.send(from, neighborsPacket, &p) } return nil }
处理neighbors返回节点列表数据包
接着上面,收到对端发送给我们的neighborsPacket 后,会调用到neighbors.handle函数,我们需要处理对方发给我们的临接节点列表。通过handleReply来通知主协程loop,进行查找,找到对应的等待的pending,然后调用其回调函数,并且将结果放到对应的errC管道上面。这样也就跟上面的udp.findnode()函数接上了,udp.findnode()函数能得到返回并且拿到对方返回给我们的node列表。
func (req *neighbors) handle(t *udp, from *net.UDPAddr, fromID NodeID, mac []byte) error { if expired(req.Expiration) { return errExpired } //收到一个neighbors 包,这个挺重要,我们需要处理对方发给我们的临接节点列表 if !t.handleReply(fromID, neighborsPacket, req) { return errUnsolicitedReply } return nil }
总结一下节点扩散的流程:
- A 发送findnode包给B,并等待neighborsPacket包的结果,在errC管道上,通过udp.findnode() ;
- B收到findnode包后给A发送neighborsPacket给A,通过findnode.handle();
- A收到neighborsPacket 包后,调用neighbors.handle 通知主协程处理这个回复的neighborsPacket;
- 主协程loop通过监听t.gotreply ,得到上述事件后,遍历得到对应的等待结构, 调用callback()后通知errC管道结果;
- A 通过errC得到findnode包的结果后返回对应的邻近节点列表,至此tab.net.findnode()函数完成;
- 查找协程调用reply <- tab.bondall(r)去连接节点;
通过上面的流程,discover模块完成了基于UDP的节点发现协议,查找到新的节点通过tab.add(node) ,将其加入到table.buckets对应的bucket里面,这样就增加了可用节点了。
当然这过程中还包括对超时重连的处理,定时器进行节点状态检查等,具体可以在Table.loop() 和 udp.loop()函数中看到具体的处理逻辑。
近期评论