geth以太坊源码分析-P2P模块TCP连接池网络通信机制原理
geth的P2P模块有2个重要的部分:基于UDP的节点发现模块 以及 TCP数据传输连接池模块。
之前讲过节点发现部分,用来根据设置的少了BootstrapNodes节点来发现更多全网的其他节点,这部分只是发现节点并找出其中可以ping通的节点,但是还没有进行使用,还没建立TCP连接进行数据传输,协议处理等。
这里分析一下P2P系统的TCP连接池是怎么建立的,以及是怎么跟其他节点通信的。
启动TCP监听,维护协程 服务
之前知道,在P2P的Server.start()会启动 discover.ListenUDP 开启后台协程进行UDP监听,不断读取数据包进行处理,从而实现UDP协议的节点发现逻辑。之后,便会开始进行TCP层的节点连接池功能,开始监听其他节点的请求,进行加密握手,以及不断去做连接池维护。
来看看 P2P的Server.start()后面部分代码:
dynPeers := srv.maxDialedConns()
//新建一个dialstate结构返回 , StaticNodes 会直接加到srv.static[]数组里面
//dialer是用来接听到上面的一堆管道事件后,管理nodes列表的, 发送TCP连接的
dialer := newDialState(srv.StaticNodes, srv.BootstrapNodes, srv.ntab, dynPeers, srv.NetRestrict)
// handshake
srv.ourHandshake = &protoHandshake{Version: baseProtocolVersion, Name: srv.Name, ID: discover.PubkeyID(&srv.PrivateKey.Public
Key)}
for _, p := range srv.Protocols {
srv.ourHandshake.Caps = append(srv.ourHandshake.Caps, p.cap())
}
// listen/dial
if srv.ListenAddr != "" {
//开始监听TCP请求, TCP端口是需要加密的,传输的信息比较敏感,所以有握手过程
if err := srv.startListening(); err != nil {
return err
}
}
srv.loopWG.Add(1)
//在协程里面运行, 开始监听各个信号量,处理peer的增删改.
//并且一开始会scheduleTasks尝试连接其他节点
go srv.run(dialer)
srv.running = true
return nil
}
这部分代码调用 startListening 开始监听TCP请求, 后面创建server.run协程处理连接池管理任务。下面分别来看一下具体流程。
监听对端连接请求startListening()
startListening 创建一个TCP监听句柄, 然后创建listenLoop协程进行不断的监听请求,进行Accept,然后握手,建立安全连接. startListening 创建协程后会立即返回。
func (srv *Server) startListening() error {
// Launch the TCP listener.
//先创建一个TCP监听句柄, 然后创建listenLoop协程进行不断的监听请求,进行Accept,然后握手,建立安全连接
listener, err := net.Listen("tcp", srv.ListenAddr)
if err != nil {
return err
}
laddr := listener.Addr().(*net.TCPAddr)
srv.ListenAddr = laddr.String()
srv.listener = listener
srv.loopWG.Add(1)
//进入监听协程监听请求
go srv.listenLoop()
return nil
}
接下来看一下 listenLoop 协程的工作,分两部分,初始化,接受连接, 以及连接建立。
listenLoop 有一个有意思的功能,就是进行并发控制,通过管道slots 进行并发控制,具体可以看下面的注释。
func (srv *Server) listenLoop() {
//startListening 调用设置的.在新的协程不断坚挺新连接进来
defer srv.loopWG.Done()
srv.log.Info("RLPx listener up", "self", srv.makeSelf(srv.listener, srv.ntab))
tokens := defaultMaxPendingPeers
if srv.MaxPendingPeers > 0 {
tokens = srv.MaxPendingPeers
}
slots := make(chan struct{}, tokens)
for i := 0; i < tokens; i++ {
//先直接放入这么多个slot进去,稍后就一个个获取了,用来实现并发控制。
//因为实际上的链家请求简历,是通过协程进行的,并非在本协程处理,所以用了管道的形式来控制并发
slots <- struct{}{}
}
for {
// Wait for a handshake slot before accepting.
<-slots //并发控制
var (
fd net.Conn
err error
)
for {
//接受链接, 检查是否有问题,没问题就break
fd, err = srv.listener.Accept()
if tempErr, ok := err.(tempError); ok && tempErr.Temporary() {
srv.log.Debug("Temporary read error", "err", err)
continue
} else if err != nil {
srv.log.Debug("Read error", "err", err)
return
}
break
}
srv.listener.Accept() 接受连接之后,便会创建新的协程专门处理这一个peer节点的请求。
if srv.NetRestrict != nil {
if tcp, ok := fd.RemoteAddr().(*net.TCPAddr); ok && !srv.NetRestrict.Contains(tcp.IP) {
srv.log.Debug("Rejected conn (not whitelisted in NetRestrict)", "addr", fd.RemoteAddr())
fd.Close()
slots <- struct{}{}
continue
}
}
fd = newMeteredConn(fd, true)
srv.log.Trace("Accepted connection", "addr", fd.RemoteAddr())
//创建协程去处理这个链接
go func() {
//TCP连接是加密的,所以需要进行两边的握手,互相交换公钥
srv.SetupConn(fd, inboundConn, nil)
slots <- struct{}{} //通知干从的 listenLoop 协程可以继续下一个并发了
}()
}
}
接下来调用srv.SetupConn进行协议握手,TCP传输链接是加密传输的,目前使用的是RLPx加密协议,处理函数是newRLPX 。
func (srv *Server) SetupConn(fd net.Conn, flags connFlag, dialDest *discover.Node) error {
self := srv.Self()
if self == nil {
return errors.New("shutdown")
}
//newTransport 目前使用的是RLPx加密协议,处理函数是newRLPX
c := &conn{fd: fd, transport: srv.newTransport(fd), flags: flags, cont: make(chan error)}
err := srv.setupConn(c, flags, dialDest)
if err != nil {
c.close(err)
srv.log.Trace("Setting up connection failed", "id", c.id, "err", err)
}
return err
}
继续来看一下setupConn 进行一个TCP协议上的握手功能。setupConn 主要是对握手协议的处理,发送数据,这里我们具体不讲协议内容了:
func (srv *Server) setupConn(c *conn, flags connFlag, dialDest *discover.Node) error {
//进行一个TCP协议上的握手
// Prevent leftover pending conns from entering the handshake.
srv.lock.Lock()
running := srv.running
srv.lock.Unlock()
if !running {
return errServerStopped
}
// Run the encryption handshake.
var err error
if c.id, err = c.doEncHandshake(srv.PrivateKey, dialDest); err != nil {
srv.log.Trace("Failed RLPx handshake", "addr", c.fd.RemoteAddr(), "conn", c.flags, "err", err)
return err
}
clog := srv.log.New("id", c.id, "addr", c.fd.RemoteAddr(), "conn", c.flags)
// For dialed connections, check that the remote public key matches.
if dialDest != nil && c.id != dialDest.ID {
clog.Trace("Dialed identity mismatch", "want", c, dialDest.ID)
return DiscUnexpectedIdentity
}
err = srv.checkpoint(c, srv.posthandshake)
if err != nil {
clog.Trace("Rejected peer before protocol handshake", "err", err)
return err
}
// Run the protocol handshake
phs, err := c.doProtoHandshake(srv.ourHandshake)
//---
err = srv.checkpoint(c, srv.addpeer)
最后srv.checkpoint(c, srv.addpeer)比较重要,会在addpeer 上面插入一条消息,而消息的另外一端就是 server.run().
当最后协议握手完成后,会触发到下面的代码:
case c := <-srv.addpeer:
// At this point the connection is past the protocol handshake.
// Its capabilities are known and the remote identity is verified.
err := srv.protoHandshakeChecks(peers, inboundCount, c)
if err == nil {
// The handshakes are done and it passed all checks.
p := newPeer(c, srv.Protocols)
// If message events are enabled, pass the peerFeed
// to the peer
if srv.EnableMsgEvents {
p.events = &srv.peerFeed
}
name := truncateName(c.name)
srv.log.Debug("Adding p2p peer", "name", name, "addr", c.fd.RemoteAddr(), "peers", len(peers)+1)
go srv.runPeer(p)
peers[c.id] = p
if p.Inbound() {
inboundCount++
}
}
select {
case c.cont <- err:
case <-srv.quit:
break running
}
上面 protoHandshakeChecks 进行一些必要检查后如果正确,那么调用newPeer创建一个peer结构,然后创建协程go srv.runPeer(p), 并记录这个peer节点:peers[c.id] = p。
runPeer 的主要任务是进行回调,以及创建了peer的时间处理,另外最主要的是调用peer.run启动跟这个peer的协程读写循环, 具体暂时不细说了,简单看下下面的代码:
func (srv *Server) runPeer(p *Peer) {
//跟一个peer连接建立完成后,调用这里来启动一个peer的维护,监听工作
if srv.newPeerHook != nil {
srv.newPeerHook(p)
}
// broadcast peer add
srv.peerFeed.Send(&PeerEvent{
Type: PeerEventTypeAdd,
Peer: p.ID(),
})
// run the protocol
remoteRequested, err := p.run()
// broadcast peer drop
srv.peerFeed.Send(&PeerEvent{
Type: PeerEventTypeDrop,
Peer: p.ID(),
Error: err.Error(),
})
// Note: run waits for existing peers to be sent on srv.delpeer
// before returning, so this send should not select on srv.quit.
srv.delpeer <- peerDrop{p, err, remoteRequested}
}
到此基本上listenLoop()函数完成了, 主要任务就是不断的accept请求,然后进行加密协议握手,创建连接, 之后启动链接相关的事件循环,进行数据读写等。
连接池管理协程 server.run()
server.run 主要任务是进行 接池管理协程,负责维护TCP连接列表, 协程里面运行, 开始监听各个信号量,处理peer的增删改。 上面说的startListening 主要进行被动的接受链接然后进行握手,最后加入到连接池。
而server.run则是一个事件循环和连接池管理功能。
首先进行初始化:
func (srv *Server) run(dialstate dialer) {
//连接池管理协程,负责维护TCP连接列表
//在协程里面运行, 开始监听各个信号量,处理peer的增删改
defer srv.loopWG.Done()
var (
peers = make(map[discover.NodeID]*Peer)
inboundCount = 0
trusted = make(map[discover.NodeID]bool, len(srv.TrustedNodes))
taskdone = make(chan task, maxActiveDialTasks)
//当前正在使用的链接
runningTasks []task
queuedTasks []task // tasks that can't run yet
)
// Put trusted nodes into a map to speed up checks.
// Trusted peers are loaded on startup and cannot be
// modified while the server is running.
//可信节点,配置的 由trusted-nodes.json设置
for _, n := range srv.TrustedNodes {
trusted[n.ID] = true
}
// removes t from runningTasks
delTask := func(t task) {
for i := range runningTasks {
if runningTasks[i] == t {
runningTasks = append(runningTasks[:i], runningTasks[i+1:]...)
break
}
}
}
接下来创建了2个最重要的函数: startTasks 和 scheduleTasks , 其功能如下:
1. startTasks 用来跟参数数组代表的节点一个个创建协程建立连接,并且每次活跃的正在建立连接的任务数不超过 maxActiveDialTasks。
2. scheduleTasks 如果没有足够的节点,那么调用这里开始去尝试从p2p discover 中找出更多节点来用。scheduleTasks 会调用startTasks进行链接。
//用来跟参数数组代表的节点一个个创建协程简历连接
startTasks := func(ts []task) (rest []task) {
i := 0
//最多可以建立maxActiveDialTasks 个链接
for ; len(runningTasks) < maxActiveDialTasks && i < len(ts); i++ {
t := ts[i]
srv.log.Trace("New dial task", "task", t)
go func() { t.Do(srv); taskdone <- t }()
//当前正在使用的链接
runningTasks = append(runningTasks, t)
}
return ts[i:]
}
//用来扫描服务发现的P2P节点,建立TCP连接, 这也是最重要的步骤了
scheduleTasks := func() {
// Start from queue first.
// 先尝试用startTasks 调用 queuedTasks 列表,试图一个个建立链接,不过可能由于maxActiveDialTasks 的限制,默认16个
//所以下面这行代码可以不断运行,直到为空
queuedTasks = append(queuedTasks[:0], startTasks(queuedTasks)...)
// Query dialer for new tasks and start as many as possible now.
//如果已经建立的链接还不到16个,那么尝试从p2p discover 中找出更多节点来用
if len(runningTasks) < maxActiveDialTasks {
nt := dialstate.newTasks(len(runningTasks)+len(queuedTasks), peers, time.Now())
queuedTasks = append(queuedTasks, startTasks(nt)...)
}
}
running:
for {
//不断尝试去建立P2P的TCP连接
scheduleTasks()
从上面 得知,server.run 一开始在循环开始便调用scheduleTasks()函数来尝试链接对端节点,如果不够就从discover模块申请更多节点。下面分别讲一下Do 和 newTasks;
dialTask.Do 跟节点建立链接;
dialTask.Do 函数用来跟一个节点建立链接,节点从哪来呢?bootnodes, 以及s.ntab.ReadRandomNodes(s.randomNodes) 返回的节点。
func (t *dialTask) Do(srv *Server) {
//这函数用来跟一个节点建立链接,节点从哪来呢?bootnodes, 以及s.ntab.ReadRandomNodes(s.randomNodes) 返回的节点。
//调用方来自于 P2P.Server 的 loop协程,里面会scheduleTasks -> startTasks -> Do() 来创建连接,默认16个
if t.dest.Incomplete() {
if !t.resolve(srv) {
return
}
}
//进去,发送connect tcp
err := t.dial(srv, t.dest)
}
Do主要就是调用了dialTask.dial 函数来连接某个TCP节点, 调用路径scheduleTasks -> startTasks -> Do()->Dial(),
func (t *dialTask) dial(srv *Server, dest *discover.Node) error {
//连接某个TCP节点, 调用路径scheduleTasks -> startTasks -> Do()->Dial()
//下面其实是TCPDialer.Dial, 很简单,发送一个TCP链接请求 t.Dialer.Dial("tcp", addr.String())
//得到一个连接句柄
fd, err := srv.Dialer.Dial(dest)
if err != nil {
return &dialError{err}
}
mfd := newMeteredConn(fd, false)
//进行握手,创建安全连接
return srv.SetupConn(mfd, t.flags, dest)
}
srv.Dialer.Dial 就是用来建立TCP链接,最后就是调用SetupConn 来进行握手了。握手完成返回是否有错误。
dialstate.newTasks申请更多节点使用
newTasks 按照一定优先级,查找可以连接的节点,首先从bootnode开始,然后从p2p 服务发现的节点中随机取一些出来,返回这些节点给调用方。下面贴一下前面部分的代码变清楚了:
func (s *dialstate) newTasks(nRunning int, peers map[discover.NodeID]*Peer, now time.Time) []task {
//按照一定优先级,查找可以连接的节点,首先从bootnode开始,然后从p2p 服务发现的节点中随机取一些出来,返回这些节点给调用方
//这函数由 (srv *Server) run(dialstate dialer) 来调用, 具体为scheduleTasks -> newTask
if s.start.IsZero() {
s.start = now
}
var newtasks []task
addDial := func(flag connFlag, n *discover.Node) bool {
//检查连接状态 , 如果连接可用,那么标记为正在连接,并且加到待连接任务newtasks 里面
if err := s.checkDial(n, peers); err != nil {
log.Trace("Skipping dial candidate", "id", n.ID, "addr", &net.TCPAddr{IP: n.IP, Port: int(n.TCP)}, "err", err)
return false
}
s.dialing[n.ID] = flag //标记正在进行
newtasks = append(newtasks, &dialTask{flags: flag, dest: n})
return true
}
server.run接下来就是简单的进行各个管道的监听,如果有事件便处理,主要还是涉及到握手协议的流程,以及进行peer的增删改操作。
running:
for {
//不断尝试去建立P2P的TCP连接
scheduleTasks()
//下面开始监听各个管道的事件,来处理peer的增删改操作
select {
case <-srv.quit:
case n := <-srv.addstatic:
case n := <-srv.removestatic:
case op := <-srv.peerOp:
case t := <-taskdone:
case c := <-srv.posthandshake:
case c := <-srv.addpeer://握手完成,进行peer的增加和启动监听事件。
case pd := <-srv.delpeer;
}
到此基本上TCP连接池管理完成,具体握手细节,以及peer的处理暂时没有细化。整体来说,TCP模块靠startListening 来进行被动连接监听, server.run进行主动的连接池管理,以及连接状态跳转,peer的增删改操作。
server.run在如果连接数不够,那么会开始进行不断的尝试,按照一定优先级去查找可以连接的节点,首先从bootnode开始,然后从p2p 服务发现的节点中随机取一些出来;

博主您好,读了您的文章,感觉受益匪浅。
想请教一个问题:以太坊是怎么解决日蚀攻击的呢?
If some one desires expert view concerning blogging afterwardd i
recommend him/her to visit this website, Keep up the pleasant work. http://Forum.nijanse.com/index.php?action=profile;u=137262