geth以太坊源码分析-连接其他以太坊peer节点流程
之前的文章讲过P2P模块的UDP节点发现机制以及TCP连接池机制,接下来讲一下连接池后面,连接到一个新的节点后,接下来的流程是什么样的,怎么开始的区块同步,交易同步等。
之前在“geth以太坊源码分析-P2P模块TCP连接池网络通信机制原理” 写过,当TCP连接池跟一个节点进行rlpx握手,建立加密连接后,接下来会触发server.run 函数的addpeer管道传递消息,进而创建一个peer实例,创建一个协程单独处理这个节点,进行业务层面的后续处理。
先来回顾一下下面的代码:
func (srv *Server) run(dialstate dialer) { //连接池管理协程,负责维护TCP连接列表 //在协程里面运行, 开始监听各个信号量,处理peer的增删改 //`~~~~ case c := <-srv.addpeer://握手完成,进行peer的增加和启动监听事件。 // At this point the connection is past the protocol handshake. // Its capabilities are known and the remote identity is verified. err := srv.protoHandshakeChecks(peers, inboundCount, c) if err == nil { // The handshakes are done and it passed all checks. //空跑的话就这两个协议,eth: {Name:eth Version:63 Length:17 Run:0xae30b0 NodeInfo:0xae3300 PeerInfo:0xae3360} {Na me:eth Version:62 Length:8 Run:0xae30b0 NodeInfo:0xae3300 PeerInfo:0xae3360} p := newPeer(c, srv.Protocols) //创建一个peer结构。 fmt.Printf("Protocols %+v\n" , srv.Protocols ) // If message events are enabled, pass the peerFeed // to the peer if srv.EnableMsgEvents { p.events = &srv.peerFeed } name := truncateName(c.name) srv.log.Debug("Adding p2p peer", "name", name, "addr", c.fd.RemoteAddr(), "peers", len(peers)+1) //创建协程 去处理,增加这个peer go srv.runPeer(p) peers[c.id] = p if p.Inbound() { inboundCount++ } }
可以看出,建立好一个安全连接后,geth会创建一个新的协程来单独处理该连接的消息,通过runPeer()函数。
func (srv *Server) runPeer(p *Peer) { //跟一个peer连接建立完成后,调用这里来启动一个peer的维护,监听工作 //go srv.runPeer(p)协程调用 if srv.newPeerHook != nil { srv.newPeerHook(p) } // broadcast peer add //广播给别人 srv.peerFeed.Send(&PeerEvent{ Type: PeerEventTypeAdd, Peer: p.ID(), }) // run the protocol //真正的peer处理函数,启动,开始进行消息读取,以及协议方面的处理,通知对应的协议层,增加了一个连接了 remoteRequested, err := p.run() // broadcast peer drop srv.peerFeed.Send(&PeerEvent{ Type: PeerEventTypeDrop, Peer: p.ID(), Error: err.Error(), }) srv.delpeer <- peerDrop{p, err, remoteRequested} }
最重要的就是p.run()了,真正的peer处理函数,启动,开始进行消息读取,以及协议方面的处理,通知对应的协议层,增加了一个连接了。Peer.run函数承担了启动对一个节点的读写循环。
当P2P模块对一个节点握手协议处理完后,addpeer队列接收到,然后调用go srv.runPeer(p) 创建协程,最后调用这里, 在协里面维护本节点对于其他节点的链接。怎么维护呢:建立一个事件读写协程,以及管理协程。
func (p *Peer) run() (remoteRequested bool, err error) { //当P2P模块对一个节点握手协议处理完后,addpeer队列接收到,然后调用go srv.runPeer(p) 创建协程,最后调用这里 //在协里面维护本节点对于其他节点的链接。怎么维护呢:建立一个事件读写协程,以及管理协程。 var ( writeStart = make(chan struct{}, 1) writeErr = make(chan error, 1) readErr = make(chan error, 1) reason DiscReason // sent to the peer ) p.wg.Add(2) //启动消息读取协程,简单pingpong自己处理,协议消息发送到proto.in里面, 由对应的协议去读取,一般也是调用特殊的ReadMsg函数从管道 读取内容 go p.readLoop(readErr) go p.pingLoop() // Start all protocol handlers. writeStart <- struct{}{} //下面比较重要,启动对应的协议的协程, 具体看协议,比如eth等 p.startProtocols(writeStart, writeErr)
readLoop 读取其他节点(如eth以太坊节点)的消息
上面的readLoop先来看一下代码:
func (p *Peer) readLoop(errc chan<- error) { //读取对方的数据然后处理, 会格局消息的类型调用对应handle defer p.wg.Done() for { //这里的ReadMsg其实是个接口,对应不同连接的处理函数,比如对于rlpx传输协议,那么使用的是rlpxFrameRW 接口的读取函数,会进行 加解密处理 //读取到消息后就返回 msg, err := p.rw.ReadMsg() if err != nil { errc <- err return } msg.ReceivedAt = time.Now() //调用handle进行消息解析 if err = p.handle(msg); err != nil { errc <- err return } } }
上面的p.rw.ReadMsg() 是关键,调用p.rw 的readMsg函数,这里不容易看出来具体是什么函数。但是仔细从建立连接开始跟过来,就能发现实际上跟Server.Start()函数里面的下面一行代码有关系:
<br />func (srv *Server) Start() (err error) { //startNode-> stack.Start() 里面会调用这里启动p2p服务 if srv.newTransport == nil {// 钩子 //这是以太坊的基于TCP之上的加密握手协议RLPx , UDP不需要 srv.newTransport = newRLPX } } func (srv *Server) SetupConn(fd net.Conn, flags connFlag, dialDest *discover.Node) error { self := srv.Self() if self == nil { return errors.New("shutdown") } //newTransport 目前使用的是RLPx加密协议,处理函数是newRLPX, 所以对于网络的读写,都是使用rlpxFrameRW接口去完成的 c := &conn{fd: fd, transport: srv.newTransport(fd), flags: flags, cont: make(chan error)} err := srv.setupConn(c, flags, dialDest) if err != nil { c.close(err) srv.log.Trace("Setting up connection failed", "id", c.id, "err", err) } return err }
ReadMsg 的实现逻辑不是这里的重点,具体代码在: p2p/rlpx.go的rlpxFrameRW.ReadMsg()里面,是个加密的消息读取函数。
读取到消息后调用p.handle()函数。handle 据消息Code处理,简单消息直接处理,应用层消息放入对应的协议的in管道里面。 对于消息的路由,比较有意思的是讲消息放入管道proto.in 但是不知道在哪处理的了,这块具体看看下面的注释就知道了,稍后也会介绍到。
func (p *Peer) handle(msg Msg) error { //根据消息Code处理,简单消息直接处理,应用层消息放入对应的协议的in管道里面 switch { case msg.Code == pingMsg:、 msg.Discard() go SendItems(p.rw, pongMsg) case msg.Code == discMsg: var reason [1]DiscReason // This is the last message. We don't need to discard or // check errors because, the connection will be closed after it. rlp.Decode(msg.Payload, &reason) return reason[0] case msg.Code < baseProtocolLength: // ignore other base protocol messages return msg.Discard() default: // it's a subprotocol message //是一条应用层的协议,那么插入到对应协议的管道里面让其处理即可 //找到对应的协议,比如eth proto, err := p.getProto(msg.Code) if err != nil { return fmt.Errorf("msg code out of range: %v", msg.Code) } //这里有意思的地方在于,如果是应用层消息,就把消息插入到管道中, //因为一般的应用层,是有等待事件循环的,这样将消息就完美的抛给了对方 //而这个proto.in在哪处理呢?答案就是 matchProtocols()里面匹配到协议后 //会创建&protoRW{Protocol: proto, offset: offset, in: make(chan Msg), w: rw} 结构, //里面包含in管道,然后在(p *Peer) run()里面 startProtocols 的时候,会开始对每个协议创建一个协程进行处理的 select { //那么,消费端在哪呢? //答案是: peer.startProtocols()->run()->ProtocolManager.handle()-> ProtocolManager.handleMsg()->ReadMsg() //后者实际上就是 protoRW.ReadMsg(). case proto.in <- msg: return nil case <-p.closed: return io.EOF } } return nil
到这里,P2P.Peer模块完成了对一个刚刚建立rlpx加密协议的链接的数据读取逻辑,最后将读取到的消息通过proto.in <- msg; 放入管道里,交给对应的协议的代码逻辑进行处理。
下面来看一下startProtocols 这个函数做的事情。
startProtocols 开启对应的协议的协程触发节点加入事件
startProtocols是 每个新的peer 会调用这里,启动这个链接后对应的协议处理函数。而协议具体是指什么呢?回到之前看看创建一个Peer的方法代码先:
func newPeer(conn *conn, protocols []Protocol) *Peer { //新建一个peer结构返回 //protomap是我所支持的协议 protomap := matchProtocols(protocols, conn.caps, conn) p := &Peer{ rw: conn,//对应的网络连接 running: protomap, //支持的协议列表,在peer。run之后,也会开启对应的协议协程 created: mclock.Now(), disc: make(chan DiscReason), protoErr: make(chan error, len(protomap)+1), // protocols + pingLoop closed: make(chan struct{}), log: log.New("id", conn.id, "conn", conn.flags), } return p }
matchProtocols 对rw对应的链接所支持的协议,查询我自己是否支持该协议, 返回能够匹配的上的协议。再往前追溯就到了node/node.go, 因为是在那里设置的本节点所支持的协议名列表的:
func (n *Node) Start() error { // Gather the protocols and start the freshly assembled P2P server for _, service := range services {//收集所有的这些服务的协议名称。 running.Protocols = append(running.Protocols, service.Protocols()...) }
默认情况下,节点只有一个协议,2个版本,具体的成员debug可以得到下面的结果:
{Protocol:{Name:eth Version:63 Length:17 Run:0xae30b0 NodeInfo:0xae3300 PeerInfo:0xae3360} in:0xc4
21d914a0 closed:<nil> wstart:<nil> werr:<nil> offset:16 w:0xc4219adb80}
下面开始看一下startProtocols 代码第一部分:
func (p *Peer) startProtocols(writeStart <-chan struct{}, writeErr chan<- error) { //每个新的peer 会调用这里,启动这个链接后对应的协议处理函数 //创建 (len(p.running) 个协程去单独一个个处理对应节点支持的协议 协程, //在readLoop->handle() 函数里面碰到非ping, pong消息后,会将消息插入到proto.in里面,进而被处理 p.wg.Add(len(p.running)) //默认基本就只有一个协议: {Protocol:{Name:eth Version:63 Length:17 Run:0xae30b0 NodeInfo:0xae3300 PeerInfo:0xae3360} in:0xc4 21d914a0 closed:<nil> wstart:<nil> werr:<nil> offset:16 w:0xc4219adb80} for _, proto := range p.running { //proto 是protoRW 结构 , 里面有ReadMsg 和w 的写入函数 proto := proto proto.closed = p.closed proto.wstart = writeStart proto.werr = writeErr var rw MsgReadWriter = proto if p.events != nil { rw = newMsgEventer(rw, p.events, p.ID(), proto.Name) } p.log.Trace(fmt.Sprintf("Starting protocol %s/%d", proto.Name, proto.Version))
函数扫描这个节点所支持的协议列表,然后创建单独的携程去处理每个协议的“建立连接后应该做的启动任务”:
//创建协程来处理每一个协议。协议举个例子: //startProtocols : &{Protocol:{Name:eth Version:63 Length:17 Run:0xae3010 NodeInfo:0xae3260 PeerInfo:0xae32c0} in:0xc421d 16ba0 closed:<nil> wstart:<nil> werr:<nil> offset:16 w:0xc421b81720} go func() { //对于eth协议,处理函数在eth/handler.go ->NewProtocolManager里实现的RUN, 是个匿名函数 //run 里面会调用ProtocolManager.handle 函数进行协议握手等 err := proto.Run(p, rw) if err == nil { p.log.Trace(fmt.Sprintf("Protocol %s/%d returned", proto.Name, proto.Version)) err = errProtocolReturned } else if err != io.EOF { p.log.Trace(fmt.Sprintf("Protocol %s/%d failed", proto.Name, proto.Version), "err", err) } p.protoErr <- err p.wg.Done() }() } }
看上面注释可以知道,proto.Run其实是各个协议设置的连接节点函数,对于eth节点,proto的各个成员是在NewProtocolManager 里面设置的匿名函数:
func NewProtocolManager(){ for i, version := range ProtocolVersions { // Skip protocol version if incompatible with the mode of operation if mode == downloader.FastSync && version < eth63 { continue } // Compatible; initialise the sub-protocol version := version // Closure for the run manager.SubProtocols = append(manager.SubProtocols, p2p.Protocol{ Name: ProtocolName, Version: version, Length: ProtocolLengths[i], //p2p的Peer.startProtocols 会逐一启动每个协议的Run函数, Run: func(p *p2p.Peer, rw p2p.MsgReadWriter) error { //来一个新的peer后会调用这里, 创建一个通用peer结构 peer := manager.newPeer(int(version), p, rw) select { //newPeerCh 似乎没什么用? 触发的管道在ProtocolManager.syncer() case manager.newPeerCh <- peer: manager.wg.Add(1) defer manager.wg.Done() //调用对应的handle 进行处理 return manager.handle(peer) case <-manager.quitSync: return p2p.DiscQuitting } }, }
上面的代码比较关键, 匿名的Run函数就是来一个新的peer后会调用这里, 创建一个通用peer结构,然后进行对应以太坊协议的处理逻辑。直接来看ProtocolManager.handle() 函数,省去了跟这里不想干的其他内容,看重点:
func (pm *ProtocolManager) handle(p *peer) error { //th/handler.go ->NewProtocolManager里实现的Run 调用这里。 当有一个新的链接建立完成后会调用到这里来进行处理 //基本是进行协议处理,读取消息在p2p/peer.go 的 ReadLoop里面,后者会将消息发送到proto.in里面 //eth层面的协议握手,先发送我的区块链状态信息,然后读取对方返回的状态,设置到p.td, p.head 上面 //rlpx层面的握手已经在p2p/peer 层面完成了 if err := p.Handshake(pm.networkId, td, hash, genesis.Hash()); err != nil { p.Log().Debug("Ethereum handshake failed", "err", err) return err } //来把我这边的未完成的交易发给对方 , 最终会通过txsyncLoop 协程进行发送TxMsg 类型的消息给对方, //对方收到消息后会由handle()->handleMsg()处理 pm.syncTransactions(p) for { //进入消息处理循环,不断读取,处理消息 if err := pm.handleMsg(p); err != nil { p.Log().Debug("Ethereum message handling failed", "err", err) return err } } }
最后面的handleMsg 是关键,前面都是进行以太坊协议握手和交易同步的事情。handleMsg 处理数据的读写。
加入一个eth节点后,会辗转从 p2p.startProtocols 调用到handleMsg, 对于eth, 这个读函数其实是proto 是protoRW 结构 , 里面有ReadMsg 和w 的写入函数 , 实际上读取函数是protoRW.ReadMsg , 后者其实是从 管道rw.in里面读取的数据。这个数据是Peer.readLoop写入的。此时消息其实已经读取到应用层了,放在管道.proto.in里面。
func (pm *ProtocolManager) handleMsg(p *peer) error { //加入一个eth节点后,会辗转从 p2p.startProtocols 调用到这里 // Read the next message from the remote peer, and ensure it's fully consumed //对于eth, 这个读函数其实是proto 是protoRW 结构 , 里面有ReadMsg 和w 的写入函数 , //实际上读取函数是protoRW.ReadMsg , 后者其实是从 管道rw.in里面读取的数据。这个数据是Peer.readLoop写入的 msg, err := p.rw.ReadMsg() //此时消息其实已经读取到应用层了,放在管道.proto.in里面。 if err != nil { return err } // Handle the message depending on its contents switch { case msg.Code == StatusMsg: case msg.Code == GetBlockHeadersMsg: case msg.Code == BlockHeadersMsg: //----其他各种消息类型 }
上面可以看出,每次循环的时候,handleMsg通过调用p.rw.ReadMsg() 来取的一条消息,然后根据消息类型进行不同的处理,比如获取区块头,获取区块信息等。
p.rw.ReadMsg() 函数实际上代码如下:
func (rw *protoRW) ReadMsg() (Msg, error) { //从rw的管道里面去读取消息。这消息是在(p *Peer) handle(msg Msg) 里面设置的。 //而本函数的调用方是谁呢,是peer.startProtocols启动的对应协议的携程来调用, //比如eth.ProtocolManager 里面调用handleMsg 进而读取 select { case msg := <-rw.in: msg.Code -= rw.offset return msg, nil case <-rw.closed: return Msg{}, io.EOF } }
这回整个流程通了,就是readLoop()读取到消息后,将消息放入proto.in 管道里,而对应的 startProtocols 函数就对该连接节点支持的协议列表分别启动了go 协程,分别调用对应协议的Run函数,最后调用到p.rw.ReadMsg() 从管道里读取这条消息。
一句话总结就是readLoop协程读取消息放入对应协议的proto.in管道, proto.Run 协程从管道读取消息并switch-case 处理该条消息进行对应的处理。
@华仔的逆袭
谢谢了,确实是贴错了,ctags跳转选错函数了,多谢啦!
msg, err := p.rw.ReadMsg()应该是调用了protoRW.ReadMsg(),您在上面也说明了,但是下面却粘贴了MsgPipeRW.ReadMsg()的代码。
应该是这一段
func (rw *protoRW) ReadMsg() (Msg, error) {
select {
case msg := <-rw.in:
msg.Code -= rw.offset
return msg, nil
case <-rw.closed:
return Msg{}, io.EOF
}
}
谢谢,拜读了您这一系列的文章,受益匪浅,自己也在做这块的工作,希望向您学习!