geth以太坊源码分析-P2P模块基于UDP的服务发现机制原理(2)
上篇文章分析到了p2p模块是怎么通过UDP协议去测试其他节点的连通性的(geth以太坊源码分析-P2P模块基于UDP的服务发现机制原理(1))。接下来分析一下是怎么发现其他邻近节点的。
节点发现主要是findnode, neighbors消息的处理。
之前文章降到,Table.doRefresh是用来进行节点连接和邻近节点发现的,代码如下:
func (tab *Table) doRefresh(done chan struct{}) {
//本函数在协程loop里面调用个,去连接我附近的所有节点,
//第一步先连接我链接的所有节点,然后再去lookup其他节点如果链接成功就加入到节点列表。
defer close(done)
// Load nodes from the database and insert
// them. This should yield a few previously seen nodes that are
// (hopefully) still alive.
//真正触发连接节点,newTable的时候不会,doRefresh在协程loop里面调用,比较安全
tab.loadSeedNodes(true)
// Run self lookup to discover new neighbor nodes.
//根据最近的节点进行findNode查找邻近节点, 进行一次扩展, 查找其他节点的peers
//首先找距离我自己最近的节点
tab.lookup(tab.self.ID, false)
第一部分我们讲过,后面部分就是调用tab.lookup进行节点发现了。lookup根据最近的节点进行findNode查找邻近节点, 进行一次扩展, 查找其他节点的peers。
节点发现
lookup 函数前面部分,先进行初始化。
func (tab *Table) lookup(targetID NodeID, refreshIfEmpty bool) []*Node {
var (
target = crypto.Keccak256Hash(targetID[:])
asked = make(map[NodeID]bool)
seen = make(map[NodeID]bool)
reply = make(chan []*Node, alpha)
pendingQueries = 0
result *nodesByDistance
)
// don't query further if we hit ourself.
// unlikely to happen often in practice.
asked[tab.self.ID] = true
for {
tab.mutex.Lock()
// generate initial result set
//调用closest来获取距离target最近的16个节点列表
result = tab.closest(target, bucketSize)
tab.mutex.Unlock()
if len(result.entries) > 0 || !refreshIfEmpty {
break
}
// The result set is empty, all nodes were dropped, refresh.
// We actually wait for the refresh to complete here. The very
// first query will hit this case and run the bootstrapping
// logic.
//完全一个节点都没有,一般不会,如果为空那么调用refresh来触发一次刷新
//实测没进来, 应该时候如果断网或者其他节点都挂掉了,那么此时没有节点连接,那么
//需要阻塞出发一次刷新,期望能够连接上掐节点,同时, refreshIfEmpty=false 只会运行一次循环
//同时,由于 loop()->doRefresh->lookup 这样的调用关系,
//所以,doRefresh每次必须在协程进行调用,否则当前协程就会卡死,永远得不到管道信号
<-tab.refresh()
refreshIfEmpty = false
}
通过tab.closest 函数,获取某个节点附近的bucketSize个邻近节点,保存到result里面。接下来准备进行节点扩散,具体看下面的代码。
for {
// ask the alpha closest nodes that we haven't asked yet
//下面的代码很巧妙,用alpha 来控制每次并发协程数,避免量太大了,默认为3
//每次并发完成后,从reply管道读取一个,注意,是一个元素出来,那么,怎么控制数量的呢?
//答案是 pendingQueries, 这个代表我总共有多少请求需要去问,当前正在进行的,如果没有了,就会break
for i := 0; i < len(result.entries) && pendingQueries < alpha; i++ {
n := result.entries[i]
if !asked[n.ID] {// 没问过的
asked[n.ID] = true
//fmt.Printf("asking %+v\n", n )
pendingQueries++
//开一个协程去问一下其他节点
go func() {
// Find potential neighbors to bond with
//下面是一个查找邻近节点的函数 , 注意到这里是用的net成员,实际上就是udp.go里面的接口了
//代码在p2p/discover/udp.go
r, err := tab.net.findnode(n.ID, n.addr(), targetID)
if err != nil {
//记录失败数
// Bump the failure counter to detect and evacuate non-bonded entries
fails := tab.db.findFails(n.ID) + 1
tab.db.updateFindFails(n.ID, fails)
log.Trace("Bumping findnode failure counter", "id", n.ID, "failcount", fails)
if fails >= maxFindnodeFailures {
log.Trace("Too many findnode failures, dropping", "id", n.ID, "failcount", fails)
tab.delete(n)
}
}
//得到邻近节点后,连接他们,等到结果后放入reply管道里面
//如果节点链接成功,会加入到tab的节点列表的
reply <- tab.bondall(r)
}()
}
}
//如果没有正在进行的链接了,只有一种情况,result.entries都扫描完毕了,那么break,
//千万不能继续进行reply了,不然会卡死的。从管道里面读取的次数,必须和正在能有的结果数相等
//这代码很妙
if pendingQueries == 0 {
// we have asked all closest nodes, stop the search
break
}
// wait for the next reply
//reply管道用来获取刚才开的findnode协程的结果,结果为新连接的节点列表
//注意下面的循环是对reply的结果来说的,不是对reply来说的,每次获取一个
for _, n := range <-reply {
if n != nil && !seen[n.ID] {
//fmt.Printf("result.push %+v\n", n )
seen[n.ID] = true
result.push(n, bucketSize)
}
}
pendingQueries--
}
//返回找到的其他节点, 此时节点以及由bondall加入到列表里面了
return result.entries
}
lookup 最重要的就是上面的循环,遍历result列表的每一个节点,分别参加协程进行findnode的发送,并且得到结果后,调用table.bondall去连接节点,测试其连通性,如果OK会加入到可用节点列表。
上面代码包含了并发度控制,避免协程并发太多。
findnode邻近节点查询
findnode()函数给toid 发送一个 findnodePacket包,询问距离target 比较近的节点有哪些, 注意以太坊的P2P扩散都是查询比较近的节点, 由Table.lookup函数调用,来查找一个节点的邻近节点。
上篇文章降到了discover模块的UDP包发送和结果处理的方式,可以在findnode上再了解一下:
func (t *udp) findnode(toid NodeID, toaddr *net.UDPAddr, target NodeID) ([]*Node, error) {
nodes := make([]*Node, 0, bucketSize)
nreceived := 0
//设置回应回调函数,等待类型为neighborsPacket的邻近节点包,如果类型对,就执行回调请求
errc := t.pending(toid, neighborsPacket, func(r interface{}) bool {
reply := r.(*neighbors)
for _, rn := range reply.Nodes {
nreceived++
//得到一个简单的node结构
n, err := t.nodeFromRPC(toaddr, rn)
if err != nil {
log.Trace("Invalid neighbor node received", "ip", rn.IP, "addr", toaddr, "err", err)
continue
}
nodes = append(nodes, n)
}
return nreceived >= bucketSize
})
//上面了一个管道事件,下面开始发送真正的findnode报文,然后进行等待了
t.send(toaddr, findnodePacket, &findnode{
Target: target,
Expiration: uint64(time.Now().Add(expiration).Unix()),
})
//收到信息后会调用上面的回调函数,然后触发管道,然后返回nodes
err := <-errc
return nodes, err
}
函数调用pending, 第二个参数neighborsPacket 代表我需要等待对方发送neighborsPacket 报文就可以返回。当收到报文后,会调用匿名callback函数处理数据包。
后面函数调用send将findnode包发送出去。那么,对方收到findnode包后怎么处理呢?
处理findnode包的响应
我们知道,udp.readLoop会循环读取UDP包,并且调用handlePacket处理函数处理这个package, 如果是findnode包的话,那么代表对方想要查询邻近节点,就会调用到findnode.handle 函数。
findnode.handle 用来给对方回复我的邻近节点。通过调用t.closest函数来获取邻近节点,然后发给对方。
func (req *findnode) handle(t *udp, from *net.UDPAddr, fromID NodeID, mac []byte) error {
//别人发我一个findnode,那么我需要告诉他我的peers了
//这个比较复杂
if expired(req.Expiration) {
return errExpired
}
//我是否已连接上了这个来源节点, 也就是发送这个包的人
if !t.db.hasBond(fromID) {
return errUnknownNode
}
target := crypto.Keccak256Hash(req.Target[:])
t.mutex.Lock()
//找一些距离target最近的节点
closest := t.closest(target, bucketSize).entries
t.mutex.Unlock()
不过这里需要注意的是,由于UDP有最大报文数限制,所以能够发送的邻近节点数目是有限的,必须做拆包发送,这也是UDP协议发包时必须注意的点。
p := neighbors{Expiration: uint64(time.Now().Add(expiration).Unix())}
var sent bool
// Send neighbors in chunks with at most maxNeighbors per packet
// to stay below the 1280 byte limit.
//扫描这些最近的节点列表,然后一个包一个包的发送给对方
for _, n := range closest {
if netutil.CheckRelayIP(from.IP, n.IP) == nil {
p.Nodes = append(p.Nodes, nodeToRPC(n))
}
if len(p.Nodes) == maxNeighbors {
//给对方发送 neighborsPacket 包,里面包含节点列表
t.send(from, neighborsPacket, &p)
p.Nodes = p.Nodes[:0]
sent = true
}
}
//扫一个尾
if len(p.Nodes) > 0 || !sent {
t.send(from, neighborsPacket, &p)
}
return nil
}
处理neighbors返回节点列表数据包
接着上面,收到对端发送给我们的neighborsPacket 后,会调用到neighbors.handle函数,我们需要处理对方发给我们的临接节点列表。通过handleReply来通知主协程loop,进行查找,找到对应的等待的pending,然后调用其回调函数,并且将结果放到对应的errC管道上面。这样也就跟上面的udp.findnode()函数接上了,udp.findnode()函数能得到返回并且拿到对方返回给我们的node列表。
func (req *neighbors) handle(t *udp, from *net.UDPAddr, fromID NodeID, mac []byte) error {
if expired(req.Expiration) {
return errExpired
}
//收到一个neighbors 包,这个挺重要,我们需要处理对方发给我们的临接节点列表
if !t.handleReply(fromID, neighborsPacket, req) {
return errUnsolicitedReply
}
return nil
}
总结一下节点扩散的流程:
- A 发送findnode包给B,并等待neighborsPacket包的结果,在errC管道上,通过udp.findnode() ;
- B收到findnode包后给A发送neighborsPacket给A,通过findnode.handle();
- A收到neighborsPacket 包后,调用neighbors.handle 通知主协程处理这个回复的neighborsPacket;
- 主协程loop通过监听t.gotreply ,得到上述事件后,遍历得到对应的等待结构, 调用callback()后通知errC管道结果;
- A 通过errC得到findnode包的结果后返回对应的邻近节点列表,至此tab.net.findnode()函数完成;
- 查找协程调用reply <- tab.bondall(r)去连接节点;
通过上面的流程,discover模块完成了基于UDP的节点发现协议,查找到新的节点通过tab.add(node) ,将其加入到table.buckets对应的bucket里面,这样就增加了可用节点了。
当然这过程中还包括对超时重连的处理,定时器进行节点状态检查等,具体可以在Table.loop() 和 udp.loop()函数中看到具体的处理逻辑。

近期评论