首页 > C/C++, geth > geth以太坊源码分析-P2P模块基于UDP的服务发现机制原理(2)

geth以太坊源码分析-P2P模块基于UDP的服务发现机制原理(2)

2018年6月25日 发表评论 阅读评论 29134次阅读    

上篇文章分析到了p2p模块是怎么通过UDP协议去测试其他节点的连通性的(geth以太坊源码分析-P2P模块基于UDP的服务发现机制原理(1))。接下来分析一下是怎么发现其他邻近节点的。
节点发现主要是findnode, neighbors消息的处理。

之前文章降到,Table.doRefresh是用来进行节点连接和邻近节点发现的,代码如下:

func (tab *Table) doRefresh(done chan struct{}) {
    //本函数在协程loop里面调用个,去连接我附近的所有节点, 
    //第一步先连接我链接的所有节点,然后再去lookup其他节点如果链接成功就加入到节点列表。
    defer close(done)

    // Load nodes from the database and insert
    // them. This should yield a few previously seen nodes that are
    // (hopefully) still alive.
    //真正触发连接节点,newTable的时候不会,doRefresh在协程loop里面调用,比较安全
    tab.loadSeedNodes(true)

    // Run self lookup to discover new neighbor nodes.
    //根据最近的节点进行findNode查找邻近节点, 进行一次扩展, 查找其他节点的peers
    //首先找距离我自己最近的节点
    tab.lookup(tab.self.ID, false)

第一部分我们讲过,后面部分就是调用tab.lookup进行节点发现了。lookup根据最近的节点进行findNode查找邻近节点, 进行一次扩展, 查找其他节点的peers。


节点发现

lookup 函数前面部分,先进行初始化。

func (tab *Table) lookup(targetID NodeID, refreshIfEmpty bool) []*Node {
    var (
        target         = crypto.Keccak256Hash(targetID[:])
        asked          = make(map[NodeID]bool)
        seen           = make(map[NodeID]bool)
        reply          = make(chan []*Node, alpha)
        pendingQueries = 0
        result         *nodesByDistance
    )
    // don't query further if we hit ourself.
    // unlikely to happen often in practice.
    asked[tab.self.ID] = true

    for {
        tab.mutex.Lock()
        // generate initial result set
        //调用closest来获取距离target最近的16个节点列表
        result = tab.closest(target, bucketSize)
        tab.mutex.Unlock()
        if len(result.entries) > 0 || !refreshIfEmpty {
            break
        }
        // The result set is empty, all nodes were dropped, refresh.
        // We actually wait for the refresh to complete here. The very
        // first query will hit this case and run the bootstrapping
        // logic.
        //完全一个节点都没有,一般不会,如果为空那么调用refresh来触发一次刷新
        //实测没进来, 应该时候如果断网或者其他节点都挂掉了,那么此时没有节点连接,那么 
        //需要阻塞出发一次刷新,期望能够连接上掐节点,同时, refreshIfEmpty=false 只会运行一次循环
        //同时,由于 loop()->doRefresh->lookup 这样的调用关系,
        //所以,doRefresh每次必须在协程进行调用,否则当前协程就会卡死,永远得不到管道信号
        <-tab.refresh()
        refreshIfEmpty = false
    }

通过tab.closest 函数,获取某个节点附近的bucketSize个邻近节点,保存到result里面。接下来准备进行节点扩散,具体看下面的代码。

    for {
        // ask the alpha closest nodes that we haven't asked yet
        //下面的代码很巧妙,用alpha 来控制每次并发协程数,避免量太大了,默认为3 
        //每次并发完成后,从reply管道读取一个,注意,是一个元素出来,那么,怎么控制数量的呢?
        //答案是 pendingQueries, 这个代表我总共有多少请求需要去问,当前正在进行的,如果没有了,就会break
        for i := 0; i < len(result.entries) && pendingQueries < alpha; i++ {
            n := result.entries[i]
            if !asked[n.ID] {// 没问过的
                asked[n.ID] = true
                //fmt.Printf("asking %+v\n", n )
                pendingQueries++
                //开一个协程去问一下其他节点
                go func() {
                    // Find potential neighbors to bond with
                    //下面是一个查找邻近节点的函数 , 注意到这里是用的net成员,实际上就是udp.go里面的接口了
                    //代码在p2p/discover/udp.go
                    r, err := tab.net.findnode(n.ID, n.addr(), targetID)
                    if err != nil {
                        //记录失败数
                        // Bump the failure counter to detect and evacuate non-bonded entries
                        fails := tab.db.findFails(n.ID) + 1
                        tab.db.updateFindFails(n.ID, fails)
                        log.Trace("Bumping findnode failure counter", "id", n.ID, "failcount", fails)

                        if fails >= maxFindnodeFailures {
                            log.Trace("Too many findnode failures, dropping", "id", n.ID, "failcount", fails)
                            tab.delete(n)
                        }
                    }
                    //得到邻近节点后,连接他们,等到结果后放入reply管道里面
                    //如果节点链接成功,会加入到tab的节点列表的
                    reply <- tab.bondall(r)
                }()
            }
        }
                //如果没有正在进行的链接了,只有一种情况,result.entries都扫描完毕了,那么break, 
        //千万不能继续进行reply了,不然会卡死的。从管道里面读取的次数,必须和正在能有的结果数相等
        //这代码很妙
        if pendingQueries == 0 {
            // we have asked all closest nodes, stop the search
            break
        }
        // wait for the next reply
        //reply管道用来获取刚才开的findnode协程的结果,结果为新连接的节点列表
        //注意下面的循环是对reply的结果来说的,不是对reply来说的,每次获取一个
        for _, n := range <-reply {
            if n != nil && !seen[n.ID] {
                //fmt.Printf("result.push %+v\n", n )
                seen[n.ID] = true
                result.push(n, bucketSize)
            }
        }
        pendingQueries--
    }
    //返回找到的其他节点, 此时节点以及由bondall加入到列表里面了
    return result.entries
}

lookup 最重要的就是上面的循环,遍历result列表的每一个节点,分别参加协程进行findnode的发送,并且得到结果后,调用table.bondall去连接节点,测试其连通性,如果OK会加入到可用节点列表。
上面代码包含了并发度控制,避免协程并发太多。


findnode邻近节点查询

findnode()函数给toid 发送一个 findnodePacket包,询问距离target 比较近的节点有哪些, 注意以太坊的P2P扩散都是查询比较近的节点, 由Table.lookup函数调用,来查找一个节点的邻近节点。
上篇文章降到了discover模块的UDP包发送和结果处理的方式,可以在findnode上再了解一下:

func (t *udp) findnode(toid NodeID, toaddr *net.UDPAddr, target NodeID) ([]*Node, error) {
    nodes := make([]*Node, 0, bucketSize)
    nreceived := 0

    //设置回应回调函数,等待类型为neighborsPacket的邻近节点包,如果类型对,就执行回调请求
    errc := t.pending(toid, neighborsPacket, func(r interface{}) bool {
        reply := r.(*neighbors)
        for _, rn := range reply.Nodes {
            nreceived++
            //得到一个简单的node结构
            n, err := t.nodeFromRPC(toaddr, rn)
            if err != nil {
                log.Trace("Invalid neighbor node received", "ip", rn.IP, "addr", toaddr, "err", err)
                continue
            }
            nodes = append(nodes, n)
        }
        return nreceived >= bucketSize
    })
    //上面了一个管道事件,下面开始发送真正的findnode报文,然后进行等待了
    t.send(toaddr, findnodePacket, &findnode{
        Target:     target,
        Expiration: uint64(time.Now().Add(expiration).Unix()),
    })
    //收到信息后会调用上面的回调函数,然后触发管道,然后返回nodes
    err := <-errc

    return nodes, err
}

函数调用pending, 第二个参数neighborsPacket 代表我需要等待对方发送neighborsPacket 报文就可以返回。当收到报文后,会调用匿名callback函数处理数据包。
后面函数调用send将findnode包发送出去。那么,对方收到findnode包后怎么处理呢?


处理findnode包的响应

我们知道,udp.readLoop会循环读取UDP包,并且调用handlePacket处理函数处理这个package, 如果是findnode包的话,那么代表对方想要查询邻近节点,就会调用到findnode.handle 函数。
findnode.handle 用来给对方回复我的邻近节点。通过调用t.closest函数来获取邻近节点,然后发给对方。

func (req *findnode) handle(t *udp, from *net.UDPAddr, fromID NodeID, mac []byte) error {
    //别人发我一个findnode,那么我需要告诉他我的peers了
    //这个比较复杂
    if expired(req.Expiration) {
        return errExpired
    }
    //我是否已连接上了这个来源节点, 也就是发送这个包的人
    if !t.db.hasBond(fromID) {
        return errUnknownNode
    }
    target := crypto.Keccak256Hash(req.Target[:])
    t.mutex.Lock()
    //找一些距离target最近的节点
    closest := t.closest(target, bucketSize).entries
    t.mutex.Unlock()

不过这里需要注意的是,由于UDP有最大报文数限制,所以能够发送的邻近节点数目是有限的,必须做拆包发送,这也是UDP协议发包时必须注意的点。

    p := neighbors{Expiration: uint64(time.Now().Add(expiration).Unix())}
    var sent bool
    // Send neighbors in chunks with at most maxNeighbors per packet
    // to stay below the 1280 byte limit.
    //扫描这些最近的节点列表,然后一个包一个包的发送给对方
    for _, n := range closest {
        if netutil.CheckRelayIP(from.IP, n.IP) == nil {
            p.Nodes = append(p.Nodes, nodeToRPC(n))
        }
        if len(p.Nodes) == maxNeighbors {
            //给对方发送 neighborsPacket 包,里面包含节点列表
            t.send(from, neighborsPacket, &p)
            p.Nodes = p.Nodes[:0]
            sent = true
        }
    }
    //扫一个尾
    if len(p.Nodes) > 0 || !sent {
        t.send(from, neighborsPacket, &p)
    }
    return nil
}

处理neighbors返回节点列表数据包

接着上面,收到对端发送给我们的neighborsPacket 后,会调用到neighbors.handle函数,我们需要处理对方发给我们的临接节点列表。通过handleReply来通知主协程loop,进行查找,找到对应的等待的pending,然后调用其回调函数,并且将结果放到对应的errC管道上面。这样也就跟上面的udp.findnode()函数接上了,udp.findnode()函数能得到返回并且拿到对方返回给我们的node列表。

func (req *neighbors) handle(t *udp, from *net.UDPAddr, fromID NodeID, mac []byte) error {
    if expired(req.Expiration) {
        return errExpired
    }
    //收到一个neighbors 包,这个挺重要,我们需要处理对方发给我们的临接节点列表
    if !t.handleReply(fromID, neighborsPacket, req) {
        return errUnsolicitedReply
    }
    return nil
}

总结一下节点扩散的流程:

  1. A 发送findnode包给B,并等待neighborsPacket包的结果,在errC管道上,通过udp.findnode() ;
  2. B收到findnode包后给A发送neighborsPacket给A,通过findnode.handle();
  3. A收到neighborsPacket 包后,调用neighbors.handle 通知主协程处理这个回复的neighborsPacket;
  4. 主协程loop通过监听t.gotreply ,得到上述事件后,遍历得到对应的等待结构, 调用callback()后通知errC管道结果;
  5. A 通过errC得到findnode包的结果后返回对应的邻近节点列表,至此tab.net.findnode()函数完成;
  6. 查找协程调用reply <- tab.bondall(r)去连接节点;

通过上面的流程,discover模块完成了基于UDP的节点发现协议,查找到新的节点通过tab.add(node) ,将其加入到table.buckets对应的bucket里面,这样就增加了可用节点了。
当然这过程中还包括对超时重连的处理,定时器进行节点状态检查等,具体可以在Table.loop() 和 udp.loop()函数中看到具体的处理逻辑。

Share
分类: C/C++, geth 标签: , ,
  1. 本文目前尚无任何评论.
  1. 本文目前尚无任何 trackbacks 和 pingbacks.

注意: 评论者允许使用'@user空格'的方式将自己的评论通知另外评论者。例如, ABC是本文的评论者之一,则使用'@ABC '(不包括单引号)将会自动将您的评论发送给ABC。使用'@all ',将会将评论发送给之前所有其它评论者。请务必注意user必须和评论者名相匹配(大小写一致)。