首页 > geth, geth > geth以太坊源码分析-连接其他以太坊peer节点流程

geth以太坊源码分析-连接其他以太坊peer节点流程

2018年7月15日 发表评论 阅读评论 34506次阅读    

之前的文章讲过P2P模块的UDP节点发现机制以及TCP连接池机制,接下来讲一下连接池后面,连接到一个新的节点后,接下来的流程是什么样的,怎么开始的区块同步,交易同步等。

之前在“geth以太坊源码分析-P2P模块TCP连接池网络通信机制原理” 写过,当TCP连接池跟一个节点进行rlpx握手,建立加密连接后,接下来会触发server.run 函数的addpeer管道传递消息,进而创建一个peer实例,创建一个协程单独处理这个节点,进行业务层面的后续处理。
先来回顾一下下面的代码:

func (srv *Server) run(dialstate dialer) {
    //连接池管理协程,负责维护TCP连接列表
    //在协程里面运行, 开始监听各个信号量,处理peer的增删改
    //`~~~~
        case c := <-srv.addpeer://握手完成,进行peer的增加和启动监听事件。
            // At this point the connection is past the protocol handshake.
            // Its capabilities are known and the remote identity is verified.
            err := srv.protoHandshakeChecks(peers, inboundCount, c)
            if err == nil {
                // The handshakes are done and it passed all checks.
                //空跑的话就这两个协议,eth: {Name:eth Version:63 Length:17 Run:0xae30b0 NodeInfo:0xae3300 PeerInfo:0xae3360} {Na
me:eth Version:62 Length:8 Run:0xae30b0 NodeInfo:0xae3300 PeerInfo:0xae3360}
                p := newPeer(c, srv.Protocols) //创建一个peer结构。
                fmt.Printf("Protocols %+v\n" , srv.Protocols )
                // If message events are enabled, pass the peerFeed
                // to the peer
                if srv.EnableMsgEvents {
                    p.events = &srv.peerFeed
                }
                name := truncateName(c.name)
                srv.log.Debug("Adding p2p peer", "name", name, "addr", c.fd.RemoteAddr(), "peers", len(peers)+1)
                //创建协程 去处理,增加这个peer
                go srv.runPeer(p)
                peers[c.id] = p
                if p.Inbound() {
                    inboundCount++
                }
            }

可以看出,建立好一个安全连接后,geth会创建一个新的协程来单独处理该连接的消息,通过runPeer()函数。

func (srv *Server) runPeer(p *Peer) {
    //跟一个peer连接建立完成后,调用这里来启动一个peer的维护,监听工作
    //go srv.runPeer(p)协程调用
    if srv.newPeerHook != nil {
        srv.newPeerHook(p)
    }

    // broadcast peer add
    //广播给别人
    srv.peerFeed.Send(&PeerEvent{
        Type: PeerEventTypeAdd,
        Peer: p.ID(),
    })

    // run the protocol
    //真正的peer处理函数,启动,开始进行消息读取,以及协议方面的处理,通知对应的协议层,增加了一个连接了
    remoteRequested, err := p.run()

    // broadcast peer drop
    srv.peerFeed.Send(&PeerEvent{
        Type:  PeerEventTypeDrop,
        Peer:  p.ID(),
        Error: err.Error(),
    })
    srv.delpeer <- peerDrop{p, err, remoteRequested}
}

最重要的就是p.run()了,真正的peer处理函数,启动,开始进行消息读取,以及协议方面的处理,通知对应的协议层,增加了一个连接了。Peer.run函数承担了启动对一个节点的读写循环。
当P2P模块对一个节点握手协议处理完后,addpeer队列接收到,然后调用go srv.runPeer(p) 创建协程,最后调用这里, 在协里面维护本节点对于其他节点的链接。怎么维护呢:建立一个事件读写协程,以及管理协程。

func (p *Peer) run() (remoteRequested bool, err error) {
    //当P2P模块对一个节点握手协议处理完后,addpeer队列接收到,然后调用go srv.runPeer(p) 创建协程,最后调用这里                   
    //在协里面维护本节点对于其他节点的链接。怎么维护呢:建立一个事件读写协程,以及管理协程。
    var (
        writeStart = make(chan struct{}, 1)
        writeErr   = make(chan error, 1)
        readErr    = make(chan error, 1)
        reason     DiscReason // sent to the peer
    )  
    p.wg.Add(2)
    //启动消息读取协程,简单pingpong自己处理,协议消息发送到proto.in里面, 由对应的协议去读取,一般也是调用特殊的ReadMsg函数从管道
读取内容
    go p.readLoop(readErr)
    go p.pingLoop()

    // Start all protocol handlers.
    writeStart <- struct{}{}
    //下面比较重要,启动对应的协议的协程, 具体看协议,比如eth等
    p.startProtocols(writeStart, writeErr)

readLoop 读取其他节点(如eth以太坊节点)的消息

上面的readLoop先来看一下代码:

func (p *Peer) readLoop(errc chan<- error) {
    //读取对方的数据然后处理, 会格局消息的类型调用对应handle
    defer p.wg.Done()
    for {
        //这里的ReadMsg其实是个接口,对应不同连接的处理函数,比如对于rlpx传输协议,那么使用的是rlpxFrameRW 接口的读取函数,会进行
加解密处理
        //读取到消息后就返回
        msg, err := p.rw.ReadMsg()
        if err != nil {
            errc <- err
            return
        }
        msg.ReceivedAt = time.Now()
        //调用handle进行消息解析
        if err = p.handle(msg); err != nil {
            errc <- err
            return
        }
    }
}

上面的p.rw.ReadMsg() 是关键,调用p.rw 的readMsg函数,这里不容易看出来具体是什么函数。但是仔细从建立连接开始跟过来,就能发现实际上跟Server.Start()函数里面的下面一行代码有关系:

<br />func (srv *Server) Start() (err error) {
    //startNode-> stack.Start() 里面会调用这里启动p2p服务
        if srv.newTransport == nil {// 钩子
        //这是以太坊的基于TCP之上的加密握手协议RLPx , UDP不需要
        srv.newTransport = newRLPX
    }
}
func (srv *Server) SetupConn(fd net.Conn, flags connFlag, dialDest *discover.Node) error {
    self := srv.Self()
    if self == nil {
        return errors.New("shutdown")
    }
    //newTransport 目前使用的是RLPx加密协议,处理函数是newRLPX, 所以对于网络的读写,都是使用rlpxFrameRW接口去完成的             
    c := &conn{fd: fd, transport: srv.newTransport(fd), flags: flags, cont: make(chan error)}
    err := srv.setupConn(c, flags, dialDest)
    if err != nil {
        c.close(err)
        srv.log.Trace("Setting up connection failed", "id", c.id, "err", err)
    }
    return err
}

ReadMsg 的实现逻辑不是这里的重点,具体代码在: p2p/rlpx.go的rlpxFrameRW.ReadMsg()里面,是个加密的消息读取函数。
读取到消息后调用p.handle()函数。handle 据消息Code处理,简单消息直接处理,应用层消息放入对应的协议的in管道里面。 对于消息的路由,比较有意思的是讲消息放入管道proto.in 但是不知道在哪处理的了,这块具体看看下面的注释就知道了,稍后也会介绍到。

func (p *Peer) handle(msg Msg) error {
    //根据消息Code处理,简单消息直接处理,应用层消息放入对应的协议的in管道里面
    switch {
    case msg.Code == pingMsg:、
        msg.Discard()
        go SendItems(p.rw, pongMsg)
    case msg.Code == discMsg:
        var reason [1]DiscReason
        // This is the last message. We don't need to discard or
        // check errors because, the connection will be closed after it.
        rlp.Decode(msg.Payload, &reason)
        return reason[0]
    case msg.Code < baseProtocolLength:
        // ignore other base protocol messages
        return msg.Discard()
    default:
        // it's a subprotocol message
        //是一条应用层的协议,那么插入到对应协议的管道里面让其处理即可
        //找到对应的协议,比如eth
        proto, err := p.getProto(msg.Code)
        if err != nil {
            return fmt.Errorf("msg code out of range: %v", msg.Code)
        }
        //这里有意思的地方在于,如果是应用层消息,就把消息插入到管道中,
        //因为一般的应用层,是有等待事件循环的,这样将消息就完美的抛给了对方
        //而这个proto.in在哪处理呢?答案就是 matchProtocols()里面匹配到协议后
        //会创建&protoRW{Protocol: proto, offset: offset, in: make(chan Msg), w: rw} 结构,
        //里面包含in管道,然后在(p *Peer) run()里面 startProtocols 的时候,会开始对每个协议创建一个协程进行处理的
        select {
            //那么,消费端在哪呢?
            //答案是: peer.startProtocols()->run()->ProtocolManager.handle()-> ProtocolManager.handleMsg()->ReadMsg()
            //后者实际上就是 protoRW.ReadMsg().
        case proto.in <- msg:
            return nil
        case <-p.closed:
            return io.EOF
        }
    }
    return nil

到这里,P2P.Peer模块完成了对一个刚刚建立rlpx加密协议的链接的数据读取逻辑,最后将读取到的消息通过proto.in <- msg; 放入管道里,交给对应的协议的代码逻辑进行处理。
下面来看一下startProtocols 这个函数做的事情。

startProtocols 开启对应的协议的协程触发节点加入事件

startProtocols是 每个新的peer 会调用这里,启动这个链接后对应的协议处理函数。而协议具体是指什么呢?回到之前看看创建一个Peer的方法代码先:

func newPeer(conn *conn, protocols []Protocol) *Peer {
    //新建一个peer结构返回 
    //protomap是我所支持的协议
    protomap := matchProtocols(protocols, conn.caps, conn)
    p := &amp;Peer{
        rw:       conn,//对应的网络连接
        running:  protomap, //支持的协议列表,在peer。run之后,也会开启对应的协议协程
        created:  mclock.Now(),
        disc:     make(chan DiscReason),
        protoErr: make(chan error, len(protomap)+1), // protocols + pingLoop
        closed:   make(chan struct{}),
        log:      log.New(&quot;id&quot;, conn.id, &quot;conn&quot;, conn.flags),
    }
    return p
}

matchProtocols 对rw对应的链接所支持的协议,查询我自己是否支持该协议, 返回能够匹配的上的协议。再往前追溯就到了node/node.go, 因为是在那里设置的本节点所支持的协议名列表的:

func (n *Node) Start() error {
        // Gather the protocols and start the freshly assembled P2P server
    for _, service := range services {//收集所有的这些服务的协议名称。
        running.Protocols = append(running.Protocols, service.Protocols()...)
    }

默认情况下,节点只有一个协议,2个版本,具体的成员debug可以得到下面的结果:
{Protocol:{Name:eth Version:63 Length:17 Run:0xae30b0 NodeInfo:0xae3300 PeerInfo:0xae3360} in:0xc4
21d914a0 closed:<nil> wstart:<nil> werr:<nil> offset:16 w:0xc4219adb80}

下面开始看一下startProtocols 代码第一部分:

func (p *Peer) startProtocols(writeStart &lt;-chan struct{}, writeErr chan&lt;- error) {
    //每个新的peer 会调用这里,启动这个链接后对应的协议处理函数
    //创建 (len(p.running) 个协程去单独一个个处理对应节点支持的协议 协程,
    //在readLoop-&gt;handle() 函数里面碰到非ping, pong消息后,会将消息插入到proto.in里面,进而被处理
    p.wg.Add(len(p.running))
    //默认基本就只有一个协议: {Protocol:{Name:eth Version:63 Length:17 Run:0xae30b0 NodeInfo:0xae3300 PeerInfo:0xae3360} in:0xc4
21d914a0 closed:&lt;nil&gt; wstart:&lt;nil&gt; werr:&lt;nil&gt; offset:16 w:0xc4219adb80}
    for _, proto := range p.running {
        //proto 是protoRW 结构 , 里面有ReadMsg 和w 的写入函数
        proto := proto
        proto.closed = p.closed
        proto.wstart = writeStart
        proto.werr = writeErr
        var rw MsgReadWriter = proto
        if p.events != nil {
            rw = newMsgEventer(rw, p.events, p.ID(), proto.Name)
        }
        p.log.Trace(fmt.Sprintf(&quot;Starting protocol %s/%d&quot;, proto.Name, proto.Version))

函数扫描这个节点所支持的协议列表,然后创建单独的携程去处理每个协议的“建立连接后应该做的启动任务”:

        //创建协程来处理每一个协议。协议举个例子:
        //startProtocols : &amp;{Protocol:{Name:eth Version:63 Length:17 Run:0xae3010 NodeInfo:0xae3260 PeerInfo:0xae32c0} in:0xc421d
16ba0 closed:&lt;nil&gt; wstart:&lt;nil&gt; werr:&lt;nil&gt; offset:16 w:0xc421b81720}
        go func() {
            //对于eth协议,处理函数在eth/handler.go -&gt;NewProtocolManager里实现的RUN, 是个匿名函数
            //run 里面会调用ProtocolManager.handle 函数进行协议握手等
            err := proto.Run(p, rw)
            if err == nil {
                p.log.Trace(fmt.Sprintf(&quot;Protocol %s/%d returned&quot;, proto.Name, proto.Version))
                err = errProtocolReturned
            } else if err != io.EOF {
                p.log.Trace(fmt.Sprintf(&quot;Protocol %s/%d failed&quot;, proto.Name, proto.Version), &quot;err&quot;, err)
            }
            p.protoErr &lt;- err
            p.wg.Done()
        }()
    }
}

看上面注释可以知道,proto.Run其实是各个协议设置的连接节点函数,对于eth节点,proto的各个成员是在NewProtocolManager 里面设置的匿名函数:

func NewProtocolManager(){
    for i, version := range ProtocolVersions {
        // Skip protocol version if incompatible with the mode of operation
        if mode == downloader.FastSync &amp;&amp; version &lt; eth63 {
            continue
        }
        // Compatible; initialise the sub-protocol
        version := version // Closure for the run
        manager.SubProtocols = append(manager.SubProtocols, p2p.Protocol{
            Name:    ProtocolName,
            Version: version,
            Length:  ProtocolLengths[i],
            //p2p的Peer.startProtocols 会逐一启动每个协议的Run函数, 
            Run: func(p *p2p.Peer, rw p2p.MsgReadWriter) error {
                //来一个新的peer后会调用这里, 创建一个通用peer结构
                peer := manager.newPeer(int(version), p, rw)
                select {
                    //newPeerCh 似乎没什么用? 触发的管道在ProtocolManager.syncer()
                case manager.newPeerCh &lt;- peer:
                    manager.wg.Add(1)
                    defer manager.wg.Done()
                    //调用对应的handle 进行处理
                    return manager.handle(peer)
                case &lt;-manager.quitSync:
                    return p2p.DiscQuitting
                }
            },
}

上面的代码比较关键, 匿名的Run函数就是来一个新的peer后会调用这里, 创建一个通用peer结构,然后进行对应以太坊协议的处理逻辑。直接来看ProtocolManager.handle() 函数,省去了跟这里不想干的其他内容,看重点:

func (pm *ProtocolManager) handle(p *peer) error {
    //th/handler.go -&gt;NewProtocolManager里实现的Run 调用这里。 当有一个新的链接建立完成后会调用到这里来进行处理
    //基本是进行协议处理,读取消息在p2p/peer.go 的 ReadLoop里面,后者会将消息发送到proto.in里面

    //eth层面的协议握手,先发送我的区块链状态信息,然后读取对方返回的状态,设置到p.td, p.head 上面
    //rlpx层面的握手已经在p2p/peer 层面完成了
    if err := p.Handshake(pm.networkId, td, hash, genesis.Hash()); err != nil {
        p.Log().Debug(&quot;Ethereum handshake failed&quot;, &quot;err&quot;, err)
        return err
    }
    //来把我这边的未完成的交易发给对方 , 最终会通过txsyncLoop 协程进行发送TxMsg 类型的消息给对方,
    //对方收到消息后会由handle()-&gt;handleMsg()处理
    pm.syncTransactions(p)
    for {
        //进入消息处理循环,不断读取,处理消息
        if err := pm.handleMsg(p); err != nil {
            p.Log().Debug(&quot;Ethereum message handling failed&quot;, &quot;err&quot;, err)
            return err
        }
    }
}

最后面的handleMsg 是关键,前面都是进行以太坊协议握手和交易同步的事情。handleMsg 处理数据的读写。
加入一个eth节点后,会辗转从 p2p.startProtocols 调用到handleMsg, 对于eth, 这个读函数其实是proto 是protoRW 结构 , 里面有ReadMsg 和w 的写入函数 , 实际上读取函数是protoRW.ReadMsg , 后者其实是从 管道rw.in里面读取的数据。这个数据是Peer.readLoop写入的。此时消息其实已经读取到应用层了,放在管道.proto.in里面。

func (pm *ProtocolManager) handleMsg(p *peer) error {
    //加入一个eth节点后,会辗转从 p2p.startProtocols 调用到这里
    // Read the next message from the remote peer, and ensure it&#039;s fully consumed
    //对于eth, 这个读函数其实是proto 是protoRW 结构 , 里面有ReadMsg 和w 的写入函数 ,
    //实际上读取函数是protoRW.ReadMsg , 后者其实是从 管道rw.in里面读取的数据。这个数据是Peer.readLoop写入的
    msg, err := p.rw.ReadMsg()  //此时消息其实已经读取到应用层了,放在管道.proto.in里面。
    if err != nil {
        return err
    }
    // Handle the message depending on its contents
    switch {
    case msg.Code == StatusMsg:
    case msg.Code == GetBlockHeadersMsg:
    case msg.Code == BlockHeadersMsg:
    //----其他各种消息类型
    }

上面可以看出,每次循环的时候,handleMsg通过调用p.rw.ReadMsg() 来取的一条消息,然后根据消息类型进行不同的处理,比如获取区块头,获取区块信息等。
p.rw.ReadMsg() 函数实际上代码如下:

func (rw *protoRW) ReadMsg() (Msg, error) {
    //从rw的管道里面去读取消息。这消息是在(p *Peer) handle(msg Msg)  里面设置的。
    //而本函数的调用方是谁呢,是peer.startProtocols启动的对应协议的携程来调用,
    //比如eth.ProtocolManager 里面调用handleMsg 进而读取
    select {
    case msg := &lt;-rw.in:
        msg.Code -= rw.offset
        return msg, nil 
    case &lt;-rw.closed:
        return Msg{}, io.EOF
    }   
}

这回整个流程通了,就是readLoop()读取到消息后,将消息放入proto.in 管道里,而对应的 startProtocols 函数就对该连接节点支持的协议列表分别启动了go 协程,分别调用对应协议的Run函数,最后调用到p.rw.ReadMsg() 从管道里读取这条消息。

一句话总结就是readLoop协程读取消息放入对应协议的proto.in管道, proto.Run 协程从管道读取消息并switch-case 处理该条消息进行对应的处理。

Share
分类: geth, geth 标签: , ,
  1. kulv
    2018年7月27日17:27 | #1

    @华仔的逆袭
    谢谢了,确实是贴错了,ctags跳转选错函数了,多谢啦!

  2. 华仔的逆袭
    2018年7月25日14:59 | #2

    msg, err := p.rw.ReadMsg()应该是调用了protoRW.ReadMsg(),您在上面也说明了,但是下面却粘贴了MsgPipeRW.ReadMsg()的代码。
    应该是这一段
    func (rw *protoRW) ReadMsg() (Msg, error) {
    select {
    case msg := <-rw.in:
    msg.Code -= rw.offset
    return msg, nil
    case <-rw.closed:
    return Msg{}, io.EOF
    }
    }

    谢谢,拜读了您这一系列的文章,受益匪浅,自己也在做这块的工作,希望向您学习!

  1. 本文目前尚无任何 trackbacks 和 pingbacks.

注意: 评论者允许使用'@user空格'的方式将自己的评论通知另外评论者。例如, ABC是本文的评论者之一,则使用'@ABC '(不包括单引号)将会自动将您的评论发送给ABC。使用'@all ',将会将评论发送给之前所有其它评论者。请务必注意user必须和评论者名相匹配(大小写一致)。