geth以太坊源码分析-P2P模块TCP连接池网络通信机制原理
geth的P2P模块有2个重要的部分:基于UDP的节点发现模块 以及 TCP数据传输连接池模块。
之前讲过节点发现部分,用来根据设置的少了BootstrapNodes节点来发现更多全网的其他节点,这部分只是发现节点并找出其中可以ping通的节点,但是还没有进行使用,还没建立TCP连接进行数据传输,协议处理等。
这里分析一下P2P系统的TCP连接池是怎么建立的,以及是怎么跟其他节点通信的。
启动TCP监听,维护协程 服务
之前知道,在P2P的Server.start()会启动 discover.ListenUDP 开启后台协程进行UDP监听,不断读取数据包进行处理,从而实现UDP协议的节点发现逻辑。之后,便会开始进行TCP层的节点连接池功能,开始监听其他节点的请求,进行加密握手,以及不断去做连接池维护。
来看看 P2P的Server.start()后面部分代码:
dynPeers := srv.maxDialedConns() //新建一个dialstate结构返回 , StaticNodes 会直接加到srv.static[]数组里面 //dialer是用来接听到上面的一堆管道事件后,管理nodes列表的, 发送TCP连接的 dialer := newDialState(srv.StaticNodes, srv.BootstrapNodes, srv.ntab, dynPeers, srv.NetRestrict) // handshake srv.ourHandshake = &protoHandshake{Version: baseProtocolVersion, Name: srv.Name, ID: discover.PubkeyID(&srv.PrivateKey.Public Key)} for _, p := range srv.Protocols { srv.ourHandshake.Caps = append(srv.ourHandshake.Caps, p.cap()) } // listen/dial if srv.ListenAddr != "" { //开始监听TCP请求, TCP端口是需要加密的,传输的信息比较敏感,所以有握手过程 if err := srv.startListening(); err != nil { return err } } srv.loopWG.Add(1) //在协程里面运行, 开始监听各个信号量,处理peer的增删改. //并且一开始会scheduleTasks尝试连接其他节点 go srv.run(dialer) srv.running = true return nil }
这部分代码调用 startListening 开始监听TCP请求, 后面创建server.run协程处理连接池管理任务。下面分别来看一下具体流程。
监听对端连接请求startListening()
startListening 创建一个TCP监听句柄, 然后创建listenLoop协程进行不断的监听请求,进行Accept,然后握手,建立安全连接. startListening 创建协程后会立即返回。
func (srv *Server) startListening() error { // Launch the TCP listener. //先创建一个TCP监听句柄, 然后创建listenLoop协程进行不断的监听请求,进行Accept,然后握手,建立安全连接 listener, err := net.Listen("tcp", srv.ListenAddr) if err != nil { return err } laddr := listener.Addr().(*net.TCPAddr) srv.ListenAddr = laddr.String() srv.listener = listener srv.loopWG.Add(1) //进入监听协程监听请求 go srv.listenLoop() return nil }
接下来看一下 listenLoop 协程的工作,分两部分,初始化,接受连接, 以及连接建立。
listenLoop 有一个有意思的功能,就是进行并发控制,通过管道slots 进行并发控制,具体可以看下面的注释。
func (srv *Server) listenLoop() { //startListening 调用设置的.在新的协程不断坚挺新连接进来 defer srv.loopWG.Done() srv.log.Info("RLPx listener up", "self", srv.makeSelf(srv.listener, srv.ntab)) tokens := defaultMaxPendingPeers if srv.MaxPendingPeers > 0 { tokens = srv.MaxPendingPeers } slots := make(chan struct{}, tokens) for i := 0; i < tokens; i++ { //先直接放入这么多个slot进去,稍后就一个个获取了,用来实现并发控制。 //因为实际上的链家请求简历,是通过协程进行的,并非在本协程处理,所以用了管道的形式来控制并发 slots <- struct{}{} } for { // Wait for a handshake slot before accepting. <-slots //并发控制 var ( fd net.Conn err error ) for { //接受链接, 检查是否有问题,没问题就break fd, err = srv.listener.Accept() if tempErr, ok := err.(tempError); ok && tempErr.Temporary() { srv.log.Debug("Temporary read error", "err", err) continue } else if err != nil { srv.log.Debug("Read error", "err", err) return } break }
srv.listener.Accept() 接受连接之后,便会创建新的协程专门处理这一个peer节点的请求。
if srv.NetRestrict != nil { if tcp, ok := fd.RemoteAddr().(*net.TCPAddr); ok && !srv.NetRestrict.Contains(tcp.IP) { srv.log.Debug("Rejected conn (not whitelisted in NetRestrict)", "addr", fd.RemoteAddr()) fd.Close() slots <- struct{}{} continue } } fd = newMeteredConn(fd, true) srv.log.Trace("Accepted connection", "addr", fd.RemoteAddr()) //创建协程去处理这个链接 go func() { //TCP连接是加密的,所以需要进行两边的握手,互相交换公钥 srv.SetupConn(fd, inboundConn, nil) slots <- struct{}{} //通知干从的 listenLoop 协程可以继续下一个并发了 }() } }
接下来调用srv.SetupConn进行协议握手,TCP传输链接是加密传输的,目前使用的是RLPx加密协议,处理函数是newRLPX 。
func (srv *Server) SetupConn(fd net.Conn, flags connFlag, dialDest *discover.Node) error { self := srv.Self() if self == nil { return errors.New("shutdown") } //newTransport 目前使用的是RLPx加密协议,处理函数是newRLPX c := &conn{fd: fd, transport: srv.newTransport(fd), flags: flags, cont: make(chan error)} err := srv.setupConn(c, flags, dialDest) if err != nil { c.close(err) srv.log.Trace("Setting up connection failed", "id", c.id, "err", err) } return err }
继续来看一下setupConn 进行一个TCP协议上的握手功能。setupConn 主要是对握手协议的处理,发送数据,这里我们具体不讲协议内容了:
func (srv *Server) setupConn(c *conn, flags connFlag, dialDest *discover.Node) error { //进行一个TCP协议上的握手 // Prevent leftover pending conns from entering the handshake. srv.lock.Lock() running := srv.running srv.lock.Unlock() if !running { return errServerStopped } // Run the encryption handshake. var err error if c.id, err = c.doEncHandshake(srv.PrivateKey, dialDest); err != nil { srv.log.Trace("Failed RLPx handshake", "addr", c.fd.RemoteAddr(), "conn", c.flags, "err", err) return err } clog := srv.log.New("id", c.id, "addr", c.fd.RemoteAddr(), "conn", c.flags) // For dialed connections, check that the remote public key matches. if dialDest != nil && c.id != dialDest.ID { clog.Trace("Dialed identity mismatch", "want", c, dialDest.ID) return DiscUnexpectedIdentity } err = srv.checkpoint(c, srv.posthandshake) if err != nil { clog.Trace("Rejected peer before protocol handshake", "err", err) return err } // Run the protocol handshake phs, err := c.doProtoHandshake(srv.ourHandshake) //--- err = srv.checkpoint(c, srv.addpeer)
最后srv.checkpoint(c, srv.addpeer)比较重要,会在addpeer 上面插入一条消息,而消息的另外一端就是 server.run().
当最后协议握手完成后,会触发到下面的代码:
case c := <-srv.addpeer: // At this point the connection is past the protocol handshake. // Its capabilities are known and the remote identity is verified. err := srv.protoHandshakeChecks(peers, inboundCount, c) if err == nil { // The handshakes are done and it passed all checks. p := newPeer(c, srv.Protocols) // If message events are enabled, pass the peerFeed // to the peer if srv.EnableMsgEvents { p.events = &srv.peerFeed } name := truncateName(c.name) srv.log.Debug("Adding p2p peer", "name", name, "addr", c.fd.RemoteAddr(), "peers", len(peers)+1) go srv.runPeer(p) peers[c.id] = p if p.Inbound() { inboundCount++ } } select { case c.cont <- err: case <-srv.quit: break running }
上面 protoHandshakeChecks 进行一些必要检查后如果正确,那么调用newPeer创建一个peer结构,然后创建协程go srv.runPeer(p), 并记录这个peer节点:peers[c.id] = p。
runPeer 的主要任务是进行回调,以及创建了peer的时间处理,另外最主要的是调用peer.run启动跟这个peer的协程读写循环, 具体暂时不细说了,简单看下下面的代码:
func (srv *Server) runPeer(p *Peer) { //跟一个peer连接建立完成后,调用这里来启动一个peer的维护,监听工作 if srv.newPeerHook != nil { srv.newPeerHook(p) } // broadcast peer add srv.peerFeed.Send(&PeerEvent{ Type: PeerEventTypeAdd, Peer: p.ID(), }) // run the protocol remoteRequested, err := p.run() // broadcast peer drop srv.peerFeed.Send(&PeerEvent{ Type: PeerEventTypeDrop, Peer: p.ID(), Error: err.Error(), }) // Note: run waits for existing peers to be sent on srv.delpeer // before returning, so this send should not select on srv.quit. srv.delpeer <- peerDrop{p, err, remoteRequested} }
到此基本上listenLoop()函数完成了, 主要任务就是不断的accept请求,然后进行加密协议握手,创建连接, 之后启动链接相关的事件循环,进行数据读写等。
连接池管理协程 server.run()
server.run 主要任务是进行 接池管理协程,负责维护TCP连接列表, 协程里面运行, 开始监听各个信号量,处理peer的增删改。 上面说的startListening 主要进行被动的接受链接然后进行握手,最后加入到连接池。
而server.run则是一个事件循环和连接池管理功能。
首先进行初始化:
func (srv *Server) run(dialstate dialer) { //连接池管理协程,负责维护TCP连接列表 //在协程里面运行, 开始监听各个信号量,处理peer的增删改 defer srv.loopWG.Done() var ( peers = make(map[discover.NodeID]*Peer) inboundCount = 0 trusted = make(map[discover.NodeID]bool, len(srv.TrustedNodes)) taskdone = make(chan task, maxActiveDialTasks) //当前正在使用的链接 runningTasks []task queuedTasks []task // tasks that can't run yet ) // Put trusted nodes into a map to speed up checks. // Trusted peers are loaded on startup and cannot be // modified while the server is running. //可信节点,配置的 由trusted-nodes.json设置 for _, n := range srv.TrustedNodes { trusted[n.ID] = true } // removes t from runningTasks delTask := func(t task) { for i := range runningTasks { if runningTasks[i] == t { runningTasks = append(runningTasks[:i], runningTasks[i+1:]...) break } } }
接下来创建了2个最重要的函数: startTasks 和 scheduleTasks , 其功能如下:
1. startTasks 用来跟参数数组代表的节点一个个创建协程建立连接,并且每次活跃的正在建立连接的任务数不超过 maxActiveDialTasks。
2. scheduleTasks 如果没有足够的节点,那么调用这里开始去尝试从p2p discover 中找出更多节点来用。scheduleTasks 会调用startTasks进行链接。
//用来跟参数数组代表的节点一个个创建协程简历连接 startTasks := func(ts []task) (rest []task) { i := 0 //最多可以建立maxActiveDialTasks 个链接 for ; len(runningTasks) < maxActiveDialTasks && i < len(ts); i++ { t := ts[i] srv.log.Trace("New dial task", "task", t) go func() { t.Do(srv); taskdone <- t }() //当前正在使用的链接 runningTasks = append(runningTasks, t) } return ts[i:] } //用来扫描服务发现的P2P节点,建立TCP连接, 这也是最重要的步骤了 scheduleTasks := func() { // Start from queue first. // 先尝试用startTasks 调用 queuedTasks 列表,试图一个个建立链接,不过可能由于maxActiveDialTasks 的限制,默认16个 //所以下面这行代码可以不断运行,直到为空 queuedTasks = append(queuedTasks[:0], startTasks(queuedTasks)...) // Query dialer for new tasks and start as many as possible now. //如果已经建立的链接还不到16个,那么尝试从p2p discover 中找出更多节点来用 if len(runningTasks) < maxActiveDialTasks { nt := dialstate.newTasks(len(runningTasks)+len(queuedTasks), peers, time.Now()) queuedTasks = append(queuedTasks, startTasks(nt)...) } } running: for { //不断尝试去建立P2P的TCP连接 scheduleTasks()
从上面 得知,server.run 一开始在循环开始便调用scheduleTasks()函数来尝试链接对端节点,如果不够就从discover模块申请更多节点。下面分别讲一下Do 和 newTasks;
dialTask.Do 跟节点建立链接;
dialTask.Do 函数用来跟一个节点建立链接,节点从哪来呢?bootnodes, 以及s.ntab.ReadRandomNodes(s.randomNodes) 返回的节点。
func (t *dialTask) Do(srv *Server) { //这函数用来跟一个节点建立链接,节点从哪来呢?bootnodes, 以及s.ntab.ReadRandomNodes(s.randomNodes) 返回的节点。 //调用方来自于 P2P.Server 的 loop协程,里面会scheduleTasks -> startTasks -> Do() 来创建连接,默认16个 if t.dest.Incomplete() { if !t.resolve(srv) { return } } //进去,发送connect tcp err := t.dial(srv, t.dest) }
Do主要就是调用了dialTask.dial 函数来连接某个TCP节点, 调用路径scheduleTasks -> startTasks -> Do()->Dial(),
func (t *dialTask) dial(srv *Server, dest *discover.Node) error { //连接某个TCP节点, 调用路径scheduleTasks -> startTasks -> Do()->Dial() //下面其实是TCPDialer.Dial, 很简单,发送一个TCP链接请求 t.Dialer.Dial("tcp", addr.String()) //得到一个连接句柄 fd, err := srv.Dialer.Dial(dest) if err != nil { return &dialError{err} } mfd := newMeteredConn(fd, false) //进行握手,创建安全连接 return srv.SetupConn(mfd, t.flags, dest) }
srv.Dialer.Dial 就是用来建立TCP链接,最后就是调用SetupConn 来进行握手了。握手完成返回是否有错误。
dialstate.newTasks申请更多节点使用
newTasks 按照一定优先级,查找可以连接的节点,首先从bootnode开始,然后从p2p 服务发现的节点中随机取一些出来,返回这些节点给调用方。下面贴一下前面部分的代码变清楚了:
func (s *dialstate) newTasks(nRunning int, peers map[discover.NodeID]*Peer, now time.Time) []task { //按照一定优先级,查找可以连接的节点,首先从bootnode开始,然后从p2p 服务发现的节点中随机取一些出来,返回这些节点给调用方 //这函数由 (srv *Server) run(dialstate dialer) 来调用, 具体为scheduleTasks -> newTask if s.start.IsZero() { s.start = now } var newtasks []task addDial := func(flag connFlag, n *discover.Node) bool { //检查连接状态 , 如果连接可用,那么标记为正在连接,并且加到待连接任务newtasks 里面 if err := s.checkDial(n, peers); err != nil { log.Trace("Skipping dial candidate", "id", n.ID, "addr", &net.TCPAddr{IP: n.IP, Port: int(n.TCP)}, "err", err) return false } s.dialing[n.ID] = flag //标记正在进行 newtasks = append(newtasks, &dialTask{flags: flag, dest: n}) return true }
server.run接下来就是简单的进行各个管道的监听,如果有事件便处理,主要还是涉及到握手协议的流程,以及进行peer的增删改操作。
running: for { //不断尝试去建立P2P的TCP连接 scheduleTasks() //下面开始监听各个管道的事件,来处理peer的增删改操作 select { case <-srv.quit: case n := <-srv.addstatic: case n := <-srv.removestatic: case op := <-srv.peerOp: case t := <-taskdone: case c := <-srv.posthandshake: case c := <-srv.addpeer://握手完成,进行peer的增加和启动监听事件。 case pd := <-srv.delpeer; }
到此基本上TCP连接池管理完成,具体握手细节,以及peer的处理暂时没有细化。整体来说,TCP模块靠startListening 来进行被动连接监听, server.run进行主动的连接池管理,以及连接状态跳转,peer的增删改操作。
server.run在如果连接数不够,那么会开始进行不断的尝试,按照一定优先级去查找可以连接的节点,首先从bootnode开始,然后从p2p 服务发现的节点中随机取一些出来;
博主您好,读了您的文章,感觉受益匪浅。
想请教一个问题:以太坊是怎么解决日蚀攻击的呢?
If some one desires expert view concerning blogging afterwardd i
recommend him/her to visit this website, Keep up the pleasant work. http://Forum.nijanse.com/index.php?action=profile;u=137262