首页 > geth, GO > geth以太坊源码分析-P2P模块TCP连接池网络通信机制原理

geth以太坊源码分析-P2P模块TCP连接池网络通信机制原理

2018年7月1日 发表评论 阅读评论 8503次阅读    

geth的P2P模块有2个重要的部分:基于UDP的节点发现模块 以及 TCP数据传输连接池模块。
之前讲过节点发现部分,用来根据设置的少了BootstrapNodes节点来发现更多全网的其他节点,这部分只是发现节点并找出其中可以ping通的节点,但是还没有进行使用,还没建立TCP连接进行数据传输,协议处理等。
这里分析一下P2P系统的TCP连接池是怎么建立的,以及是怎么跟其他节点通信的。

启动TCP监听,维护协程 服务

之前知道,在P2P的Server.start()会启动 discover.ListenUDP 开启后台协程进行UDP监听,不断读取数据包进行处理,从而实现UDP协议的节点发现逻辑。之后,便会开始进行TCP层的节点连接池功能,开始监听其他节点的请求,进行加密握手,以及不断去做连接池维护。
来看看 P2P的Server.start()后面部分代码:

    dynPeers := srv.maxDialedConns()
    //新建一个dialstate结构返回 , StaticNodes 会直接加到srv.static[]数组里面
    //dialer是用来接听到上面的一堆管道事件后,管理nodes列表的, 发送TCP连接的
    dialer := newDialState(srv.StaticNodes, srv.BootstrapNodes, srv.ntab, dynPeers, srv.NetRestrict)

    // handshake
    srv.ourHandshake = &protoHandshake{Version: baseProtocolVersion, Name: srv.Name, ID: discover.PubkeyID(&srv.PrivateKey.Public
Key)}
    for _, p := range srv.Protocols {
        srv.ourHandshake.Caps = append(srv.ourHandshake.Caps, p.cap())
    }
    // listen/dial
    if srv.ListenAddr != "" {
        //开始监听TCP请求, TCP端口是需要加密的,传输的信息比较敏感,所以有握手过程
        if err := srv.startListening(); err != nil {
            return err
        }
    }
    srv.loopWG.Add(1)
    //在协程里面运行, 开始监听各个信号量,处理peer的增删改.
    //并且一开始会scheduleTasks尝试连接其他节点
    go srv.run(dialer)
    srv.running = true
    return nil
}

这部分代码调用 startListening 开始监听TCP请求, 后面创建server.run协程处理连接池管理任务。下面分别来看一下具体流程。

监听对端连接请求startListening()

startListening 创建一个TCP监听句柄, 然后创建listenLoop协程进行不断的监听请求,进行Accept,然后握手,建立安全连接. startListening 创建协程后会立即返回。

func (srv *Server) startListening() error {
    // Launch the TCP listener.
    //先创建一个TCP监听句柄, 然后创建listenLoop协程进行不断的监听请求,进行Accept,然后握手,建立安全连接
    listener, err := net.Listen("tcp", srv.ListenAddr)
    if err != nil {
        return err
    }
    laddr := listener.Addr().(*net.TCPAddr)
    srv.ListenAddr = laddr.String()
    srv.listener = listener
    srv.loopWG.Add(1)
    //进入监听协程监听请求
    go srv.listenLoop()
    return nil
}

接下来看一下 listenLoop 协程的工作,分两部分,初始化,接受连接, 以及连接建立。
listenLoop 有一个有意思的功能,就是进行并发控制,通过管道slots 进行并发控制,具体可以看下面的注释。

func (srv *Server) listenLoop() {
    //startListening 调用设置的.在新的协程不断坚挺新连接进来
    defer srv.loopWG.Done()
    srv.log.Info("RLPx listener up", "self", srv.makeSelf(srv.listener, srv.ntab))

    tokens := defaultMaxPendingPeers
    if srv.MaxPendingPeers > 0 {
        tokens = srv.MaxPendingPeers
    }
    slots := make(chan struct{}, tokens)
    for i := 0; i < tokens; i++ {
        //先直接放入这么多个slot进去,稍后就一个个获取了,用来实现并发控制。
        //因为实际上的链家请求简历,是通过协程进行的,并非在本协程处理,所以用了管道的形式来控制并发
        slots <- struct{}{}
    }

    for {
        // Wait for a handshake slot before accepting.
        <-slots //并发控制

        var (
            fd  net.Conn
            err error
        )
        for {
            //接受链接, 检查是否有问题,没问题就break
            fd, err = srv.listener.Accept()
            if tempErr, ok := err.(tempError); ok && tempErr.Temporary() {
                srv.log.Debug("Temporary read error", "err", err)
                continue
            } else if err != nil {
                srv.log.Debug("Read error", "err", err)
                return
            }
            break
        }

srv.listener.Accept() 接受连接之后,便会创建新的协程专门处理这一个peer节点的请求。

        if srv.NetRestrict != nil {
            if tcp, ok := fd.RemoteAddr().(*net.TCPAddr); ok && !srv.NetRestrict.Contains(tcp.IP) {
                srv.log.Debug("Rejected conn (not whitelisted in NetRestrict)", "addr", fd.RemoteAddr())
                fd.Close()
                slots <- struct{}{}
                continue
            }
        }

        fd = newMeteredConn(fd, true)
        srv.log.Trace("Accepted connection", "addr", fd.RemoteAddr())
        //创建协程去处理这个链接
        go func() {
            //TCP连接是加密的,所以需要进行两边的握手,互相交换公钥
            srv.SetupConn(fd, inboundConn, nil)
            slots <- struct{}{} //通知干从的 listenLoop 协程可以继续下一个并发了
        }()
    }
}

接下来调用srv.SetupConn进行协议握手,TCP传输链接是加密传输的,目前使用的是RLPx加密协议,处理函数是newRLPX 。

func (srv *Server) SetupConn(fd net.Conn, flags connFlag, dialDest *discover.Node) error {
    self := srv.Self()
    if self == nil {
        return errors.New("shutdown")
    }
    //newTransport 目前使用的是RLPx加密协议,处理函数是newRLPX 
    c := &conn{fd: fd, transport: srv.newTransport(fd), flags: flags, cont: make(chan error)}
    err := srv.setupConn(c, flags, dialDest)
    if err != nil {
        c.close(err)
        srv.log.Trace("Setting up connection failed", "id", c.id, "err", err)
    }
    return err
}

继续来看一下setupConn 进行一个TCP协议上的握手功能。setupConn 主要是对握手协议的处理,发送数据,这里我们具体不讲协议内容了:

func (srv *Server) setupConn(c *conn, flags connFlag, dialDest *discover.Node) error {
    //进行一个TCP协议上的握手
    // Prevent leftover pending conns from entering the handshake.
    srv.lock.Lock()
    running := srv.running
    srv.lock.Unlock()
    if !running {
        return errServerStopped
    }
    // Run the encryption handshake.
    var err error
    if c.id, err = c.doEncHandshake(srv.PrivateKey, dialDest); err != nil {
        srv.log.Trace("Failed RLPx handshake", "addr", c.fd.RemoteAddr(), "conn", c.flags, "err", err)
        return err
    }
    clog := srv.log.New("id", c.id, "addr", c.fd.RemoteAddr(), "conn", c.flags)
    // For dialed connections, check that the remote public key matches.
    if dialDest != nil && c.id != dialDest.ID {
        clog.Trace("Dialed identity mismatch", "want", c, dialDest.ID)
        return DiscUnexpectedIdentity
    }
    err = srv.checkpoint(c, srv.posthandshake)
    if err != nil {
        clog.Trace("Rejected peer before protocol handshake", "err", err)
        return err
    }
    // Run the protocol handshake
    phs, err := c.doProtoHandshake(srv.ourHandshake)
    //---
    err = srv.checkpoint(c, srv.addpeer)

最后srv.checkpoint(c, srv.addpeer)比较重要,会在addpeer 上面插入一条消息,而消息的另外一端就是 server.run().
当最后协议握手完成后,会触发到下面的代码:

        case c := <-srv.addpeer:
            // At this point the connection is past the protocol handshake.
            // Its capabilities are known and the remote identity is verified.
            err := srv.protoHandshakeChecks(peers, inboundCount, c)
            if err == nil {
                // The handshakes are done and it passed all checks.
                p := newPeer(c, srv.Protocols)
                // If message events are enabled, pass the peerFeed
                // to the peer
                if srv.EnableMsgEvents {
                    p.events = &srv.peerFeed
                }
                name := truncateName(c.name)
                srv.log.Debug("Adding p2p peer", "name", name, "addr", c.fd.RemoteAddr(), "peers", len(peers)+1)
                go srv.runPeer(p)
                peers[c.id] = p
                if p.Inbound() {
                    inboundCount++
                }
            }
            select {
            case c.cont <- err:
            case <-srv.quit:
                break running
            }

上面 protoHandshakeChecks 进行一些必要检查后如果正确,那么调用newPeer创建一个peer结构,然后创建协程go srv.runPeer(p), 并记录这个peer节点:peers[c.id] = p。
runPeer 的主要任务是进行回调,以及创建了peer的时间处理,另外最主要的是调用peer.run启动跟这个peer的协程读写循环, 具体暂时不细说了,简单看下下面的代码:

func (srv *Server) runPeer(p *Peer) {
//跟一个peer连接建立完成后,调用这里来启动一个peer的维护,监听工作
    if srv.newPeerHook != nil {
        srv.newPeerHook(p)
    }

    // broadcast peer add
    srv.peerFeed.Send(&PeerEvent{
        Type: PeerEventTypeAdd,
        Peer: p.ID(),
    })

    // run the protocol
    remoteRequested, err := p.run()

    // broadcast peer drop
    srv.peerFeed.Send(&PeerEvent{
        Type:  PeerEventTypeDrop,
        Peer:  p.ID(),
        Error: err.Error(),
    })

    // Note: run waits for existing peers to be sent on srv.delpeer
    // before returning, so this send should not select on srv.quit.
    srv.delpeer <- peerDrop{p, err, remoteRequested}
}

到此基本上listenLoop()函数完成了, 主要任务就是不断的accept请求,然后进行加密协议握手,创建连接, 之后启动链接相关的事件循环,进行数据读写等。

连接池管理协程 server.run()

server.run 主要任务是进行 接池管理协程,负责维护TCP连接列表, 协程里面运行, 开始监听各个信号量,处理peer的增删改。 上面说的startListening 主要进行被动的接受链接然后进行握手,最后加入到连接池。
而server.run则是一个事件循环和连接池管理功能。

首先进行初始化:

func (srv *Server) run(dialstate dialer) {
    //连接池管理协程,负责维护TCP连接列表
    //在协程里面运行, 开始监听各个信号量,处理peer的增删改
    defer srv.loopWG.Done()
    var (
        peers        = make(map[discover.NodeID]*Peer)
        inboundCount = 0
        trusted      = make(map[discover.NodeID]bool, len(srv.TrustedNodes))
        taskdone     = make(chan task, maxActiveDialTasks)
        //当前正在使用的链接
        runningTasks []task
        queuedTasks  []task // tasks that can't run yet
    )
    // Put trusted nodes into a map to speed up checks.
    // Trusted peers are loaded on startup and cannot be
    // modified while the server is running.
    //可信节点,配置的 由trusted-nodes.json设置
    for _, n := range srv.TrustedNodes {
        trusted[n.ID] = true
    }

    // removes t from runningTasks
    delTask := func(t task) {
        for i := range runningTasks {
            if runningTasks[i] == t {
                runningTasks = append(runningTasks[:i], runningTasks[i+1:]...)
                break
            }
        }
    }

接下来创建了2个最重要的函数: startTasks 和 scheduleTasks , 其功能如下:
1. startTasks 用来跟参数数组代表的节点一个个创建协程建立连接,并且每次活跃的正在建立连接的任务数不超过 maxActiveDialTasks。
2. scheduleTasks 如果没有足够的节点,那么调用这里开始去尝试从p2p discover 中找出更多节点来用。scheduleTasks 会调用startTasks进行链接。

    //用来跟参数数组代表的节点一个个创建协程简历连接
    startTasks := func(ts []task) (rest []task) {
        i := 0
        //最多可以建立maxActiveDialTasks 个链接
        for ; len(runningTasks) < maxActiveDialTasks && i < len(ts); i++ {
            t := ts[i]
            srv.log.Trace("New dial task", "task", t)
            go func() { t.Do(srv); taskdone <- t }()
            //当前正在使用的链接
            runningTasks = append(runningTasks, t)
        }
        return ts[i:]
    }
    //用来扫描服务发现的P2P节点,建立TCP连接, 这也是最重要的步骤了
    scheduleTasks := func() {
        // Start from queue first.
        // 先尝试用startTasks 调用 queuedTasks 列表,试图一个个建立链接,不过可能由于maxActiveDialTasks 的限制,默认16个
        //所以下面这行代码可以不断运行,直到为空
        queuedTasks = append(queuedTasks[:0], startTasks(queuedTasks)...)
        // Query dialer for new tasks and start as many as possible now.
        //如果已经建立的链接还不到16个,那么尝试从p2p discover  中找出更多节点来用
        if len(runningTasks) < maxActiveDialTasks {
            nt := dialstate.newTasks(len(runningTasks)+len(queuedTasks), peers, time.Now())
            queuedTasks = append(queuedTasks, startTasks(nt)...)
        }
    }
    running:
    for {
        //不断尝试去建立P2P的TCP连接
        scheduleTasks()

从上面 得知,server.run 一开始在循环开始便调用scheduleTasks()函数来尝试链接对端节点,如果不够就从discover模块申请更多节点。下面分别讲一下Do 和 newTasks;

dialTask.Do 跟节点建立链接;

dialTask.Do 函数用来跟一个节点建立链接,节点从哪来呢?bootnodes, 以及s.ntab.ReadRandomNodes(s.randomNodes) 返回的节点。

func (t *dialTask) Do(srv *Server) {
    //这函数用来跟一个节点建立链接,节点从哪来呢?bootnodes, 以及s.ntab.ReadRandomNodes(s.randomNodes) 返回的节点。 
    //调用方来自于 P2P.Server 的 loop协程,里面会scheduleTasks -> startTasks -> Do() 来创建连接,默认16个
    if t.dest.Incomplete() {
        if !t.resolve(srv) {
            return
        }
    }
    //进去,发送connect tcp
    err := t.dial(srv, t.dest)  
}  

Do主要就是调用了dialTask.dial 函数来连接某个TCP节点, 调用路径scheduleTasks -> startTasks -> Do()->Dial(),

func (t *dialTask) dial(srv *Server, dest *discover.Node) error { 
    //连接某个TCP节点, 调用路径scheduleTasks -> startTasks -> Do()->Dial()
    //下面其实是TCPDialer.Dial,  很简单,发送一个TCP链接请求 t.Dialer.Dial("tcp", addr.String())
    //得到一个连接句柄
    fd, err := srv.Dialer.Dial(dest)
    if err != nil {
        return &dialError{err}
    }
    mfd := newMeteredConn(fd, false)
    //进行握手,创建安全连接
    return srv.SetupConn(mfd, t.flags, dest)
} 

srv.Dialer.Dial 就是用来建立TCP链接,最后就是调用SetupConn 来进行握手了。握手完成返回是否有错误。

dialstate.newTasks申请更多节点使用

newTasks 按照一定优先级,查找可以连接的节点,首先从bootnode开始,然后从p2p 服务发现的节点中随机取一些出来,返回这些节点给调用方。下面贴一下前面部分的代码变清楚了:

func (s *dialstate) newTasks(nRunning int, peers map[discover.NodeID]*Peer, now time.Time) []task {
    //按照一定优先级,查找可以连接的节点,首先从bootnode开始,然后从p2p 服务发现的节点中随机取一些出来,返回这些节点给调用方
    //这函数由 (srv *Server) run(dialstate dialer) 来调用, 具体为scheduleTasks -> newTask
    if s.start.IsZero() {
        s.start = now
    }

    var newtasks []task
    addDial := func(flag connFlag, n *discover.Node) bool {
        //检查连接状态 , 如果连接可用,那么标记为正在连接,并且加到待连接任务newtasks 里面
        if err := s.checkDial(n, peers); err != nil {
            log.Trace("Skipping dial candidate", "id", n.ID, "addr", &net.TCPAddr{IP: n.IP, Port: int(n.TCP)}, "err", err)
            return false
        }
        s.dialing[n.ID] = flag //标记正在进行
        newtasks = append(newtasks, &dialTask{flags: flag, dest: n})
        return true
    }

server.run接下来就是简单的进行各个管道的监听,如果有事件便处理,主要还是涉及到握手协议的流程,以及进行peer的增删改操作。

running:
    for {
        //不断尝试去建立P2P的TCP连接
        scheduleTasks()

        //下面开始监听各个管道的事件,来处理peer的增删改操作
        select {
        case <-srv.quit:
        case n := <-srv.addstatic:
        case n := <-srv.removestatic:
        case op := <-srv.peerOp:
        case t := <-taskdone:
        case c := <-srv.posthandshake:
        case c := <-srv.addpeer://握手完成,进行peer的增加和启动监听事件。
        case pd := <-srv.delpeer;
        }

到此基本上TCP连接池管理完成,具体握手细节,以及peer的处理暂时没有细化。整体来说,TCP模块靠startListening 来进行被动连接监听, server.run进行主动的连接池管理,以及连接状态跳转,peer的增删改操作。
server.run在如果连接数不够,那么会开始进行不断的尝试,按照一定优先级去查找可以连接的节点,首先从bootnode开始,然后从p2p 服务发现的节点中随机取一些出来;

Share
分类: geth, GO 标签: , ,
  1. 2018年7月5日16:43 | #1

    If some one desires expert view concerning blogging afterwardd i
    recommend him/her to visit this website, Keep up the pleasant work. http://Forum.nijanse.com/index.php?action=profile;u=137262

  1. 2018年7月15日00:57 | #1

注意: 评论者允许使用'@user空格'的方式将自己的评论通知另外评论者。例如, ABC是本文的评论者之一,则使用'@ABC '(不包括单引号)将会自动将您的评论发送给ABC。使用'@all ',将会将评论发送给之前所有其它评论者。请务必注意user必须和评论者名相匹配(大小写一致)。