Skip to content

Commit b7cb8c8

Browse files
committed
Fix PeerPool to use getFuture() only once when adding peer.
Add isAlive(peer) procedure. Add join(peer) procedure. Remove useless metric. Add notifyAndWait() procedure which will help to avoid race while disconnecting.
1 parent d16e127 commit b7cb8c8

File tree

2 files changed

+49
-20
lines changed

2 files changed

+49
-20
lines changed

beacon_chain/eth2_network.nim

Lines changed: 45 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -275,12 +275,6 @@ declarePublicCounter nbc_successful_discoveries,
275275
declarePublicCounter nbc_failed_discoveries,
276276
"Number of failed discoveries"
277277

278-
const delayBuckets = [1.0, 5.0, 10.0, 20.0, 40.0, 60.0]
279-
280-
declareHistogram nbc_resolve_time,
281-
"Time(s) used while resolving peer information",
282-
buckets = delayBuckets
283-
284278
const
285279
snappy_implementation {.strdefine.} = "libp2p"
286280

@@ -348,6 +342,9 @@ proc getFuture*(peer: Peer): Future[void] {.inline.} =
348342
peer.disconnectedFut = newFuture[void]("Peer.disconnectedFut")
349343
peer.disconnectedFut
350344

345+
proc isAlive*(peer: Peer): bool =
346+
peer.connectionState notin {Disconnecting, Disconnected}
347+
351348
proc getScore*(a: Peer): int =
352349
## Returns current score value for peer ``peer``.
353350
a.score
@@ -358,6 +355,41 @@ proc updateScore*(peer: Peer, score: int) {.inline.} =
358355
if peer.score > PeerScoreHighLimit:
359356
peer.score = PeerScoreHighLimit
360357

358+
proc join*(peer: Peer): Future[void] =
359+
var retFuture = newFuture[void]("peer.lifetime.join")
360+
let peerFut = peer.getFuture()
361+
let alreadyFinished = peerFut.finished()
362+
363+
proc continuation(udata: pointer) {.gcsafe.} =
364+
if not(retFuture.finished()):
365+
retFuture.complete()
366+
367+
proc cancellation(udata: pointer) {.gcsafe.} =
368+
if not(alreadyFinished):
369+
peerFut.removeCallback(continuation)
370+
371+
if alreadyFinished:
372+
# All the `peer.disconnectedFut` callbacks are already scheduled in current
373+
# `poll()` call, to avoid race we going to finish only in next `poll()`
374+
# call.
375+
callSoon(continuation, cast[pointer](retFuture))
376+
else:
377+
# `peer.disconnectedFut` is not yet finished, but we want to be scheduled
378+
# after all callbacks.
379+
peerFut.addCallback(continuation)
380+
381+
return retFuture
382+
383+
proc notifyAndWait*(peer: Peer): Future[void] =
384+
## Notify all the waiters that peer life is finished and wait until all
385+
## callbacks will be processed.
386+
let joinFut = peer.join()
387+
let fut = peer.disconnectedFut
388+
peer.connectionState = Disconnecting
389+
fut.complete()
390+
peer.disconnectedFut = nil
391+
joinFut
392+
361393
proc calcThroughput(dur: Duration, value: uint64): float =
362394
let secs = float(chronos.seconds(1).nanoseconds)
363395
if isZero(dur):
@@ -941,17 +973,16 @@ proc resolvePeer(peer: Peer) =
941973
discard peer.info.peerId.extractPublicKey(key)
942974
keys.PublicKey.fromRaw(key.skkey.getBytes()).get().toNodeId()
943975

944-
debug "Peer's ENR recovery task started", node_id = $nodeId
945-
946976
# This is "fast-path" for peers which was dialed. In this case discovery
947977
# already has most recent ENR information about this peer.
948978
let gnode = peer.network.discovery.getNode(nodeId)
949979
if gnode.isSome():
950980
peer.enr = some(gnode.get().record)
951981
inc(nbc_successful_discoveries)
952-
let delay = now(chronos.Moment) - startTime
953-
nbc_resolve_time.observe(delay.toFloatSeconds())
954-
debug "Peer's ENR recovered", delay = $delay
982+
debug "Peer's ENR recovered"
983+
else:
984+
inc(nbc_failed_discoveries)
985+
debug "Peer's ENR could not be recovered"
955986

956987
proc handlePeer*(peer: Peer) {.async.} =
957988
let res = peer.network.peerPool.addPeerNoWait(peer, peer.direction)
@@ -980,8 +1011,7 @@ proc handlePeer*(peer: Peer) {.async.} =
9801011
# Peer was added to PeerPool.
9811012
peer.score = NewPeerScore
9821013
peer.connectionState = Connected
983-
# We spawn task which will obtain ENR for this peer.
984-
resolvePeer(peer)
1014+
peer.resolvePeer()
9851015
debug "Peer successfully connected", peer = peer,
9861016
connections = peer.connections
9871017

@@ -1048,10 +1078,8 @@ proc onConnEvent(node: Eth2Node, peerId: PeerID, event: ConnEvent) {.async.} =
10481078
# Whatever caused disconnection, avoid connection spamming
10491079
node.addSeen(peerId, SeenTableTimeReconnect)
10501080

1051-
let fut = peer.disconnectedFut
1052-
if not(isNil(fut)):
1053-
fut.complete()
1054-
peer.disconnectedFut = nil
1081+
if not(isNil(peer.disconnectedFut)):
1082+
await peer.notifyAndWait()
10551083
else:
10561084
# TODO (cheatfate): This could be removed when bug will be fixed inside
10571085
# `nim-libp2p`.

beacon_chain/peer_pool.nim

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -339,6 +339,7 @@ proc deletePeer*[A, B](pool: PeerPool[A, B], peer: A, force = false): bool =
339339

340340
proc addPeerImpl[A, B](pool: PeerPool[A, B], peer: A, peerKey: B,
341341
peerType: PeerType) =
342+
mixin getFuture
342343
proc onPeerClosed(udata: pointer) {.gcsafe.} =
343344
discard pool.deletePeer(peer)
344345

@@ -367,13 +368,13 @@ proc checkPeer*[A, B](pool: PeerPool[A, B], peer: A): PeerStatus {.inline.} =
367368
## * Peer's lifetime future is not finished yet - (PeerStatus.DeadPeerError)
368369
##
369370
## If peer could be added to PeerPool procedure returns (PeerStatus.Success)
370-
mixin getKey, getFuture
371+
mixin getKey, isAlive
371372
if not(pool.checkPeerScore(peer)):
372373
PeerStatus.LowScoreError
373374
else:
374375
let peerKey = getKey(peer)
375376
if not(pool.registry.hasKey(peerKey)):
376-
if not(peer.getFuture().finished):
377+
if peer.isAlive():
377378
PeerStatus.Success
378379
else:
379380
PeerStatus.DeadPeerError
@@ -393,7 +394,7 @@ proc addPeerNoWait*[A, B](pool: PeerPool[A, B],
393394
## (PeerStatus.NoSpaceError)
394395
##
395396
## Procedure returns (PeerStatus.Success) on success.
396-
mixin getKey, getFuture
397+
mixin getKey
397398
let res = pool.checkPeer(peer)
398399
if res != PeerStatus.Success:
399400
res

0 commit comments

Comments
 (0)