Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 46 additions & 8 deletions beacon_chain/networking/eth2_network.nim
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,9 @@ proc getFuture*(peer: Peer): Future[void] {.inline.} =
peer.disconnectedFut = newFuture[void]("Peer.disconnectedFut")
peer.disconnectedFut

proc isAlive*(peer: Peer): bool =
peer.connectionState notin {Disconnecting, Disconnected}

proc getScore*(a: Peer): int =
## Returns current score value for peer ``peer``.
a.score
Expand All @@ -366,6 +369,43 @@ proc updateScore*(peer: Peer, score: int) {.inline.} =
if peer.score > PeerScoreHighLimit:
peer.score = PeerScoreHighLimit

proc join*(peer: Peer): Future[void] =
var retFuture = newFuture[void]("peer.lifetime.join")
let peerFut = peer.getFuture()

proc continuation(udata: pointer) {.gcsafe.} =
if not(retFuture.finished()):
retFuture.complete()

proc cancellation(udata: pointer) {.gcsafe.} =
if not(isNil(peerFut)):
peerFut.removeCallback(continuation)

if peerFut.finished():
# All the `peer.disconnectedFut` callbacks are already scheduled in current
# `poll()` call, to avoid race we going to finish only in next `poll()`
# call.
callSoon(continuation, cast[pointer](retFuture))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

continuation does not use its udata pointer - why include a pointer here?

else:
# `peer.disconnectedFut` is not yet finished, but we want to be scheduled
# after all callbacks.
peerFut.addCallback(continuation)
retFuture.cancelCallback = cancellation

retFuture

proc notifyAndWait*(network: ETh2Node, peer: Peer): Future[void] =
## Notify all the waiters that peer life is finished and wait until all
## callbacks will be processed.
let
joinFut = peer.join()
poolFut = network.peerPool.joinPeer(peer)
discFut = peer.disconnectedFut
peer.connectionState = Disconnecting
discFut.complete()
peer.disconnectedFut = nil
allFutures(joinFut, poolFut)

proc calcThroughput(dur: Duration, value: uint64): float =
let secs = float(chronos.seconds(1).nanoseconds)
if isZero(dur):
Expand Down Expand Up @@ -1183,8 +1223,6 @@ proc resolvePeer(peer: Peer) =
discard peer.peerId.extractPublicKey(key)
keys.PublicKey.fromRaw(key.skkey.getBytes()).get().toNodeId()

debug "Peer's ENR recovery task started", node_id = $nodeId

# This is "fast-path" for peers which was dialed. In this case discovery
# already has most recent ENR information about this peer.
let gnode = peer.network.discovery.getNode(nodeId)
Expand All @@ -1194,6 +1232,9 @@ proc resolvePeer(peer: Peer) =
let delay = now(chronos.Moment) - startTime
nbc_resolve_time.observe(delay.toFloatSeconds())
debug "Peer's ENR recovered", delay
else:
inc(nbc_failed_discoveries)
debug "Peer's ENR could not be recovered"

proc handlePeer*(peer: Peer) {.async.} =
let res = peer.network.peerPool.addPeerNoWait(peer, peer.direction)
Expand Down Expand Up @@ -1222,8 +1263,7 @@ proc handlePeer*(peer: Peer) {.async.} =
# Peer was added to PeerPool.
peer.score = NewPeerScore
peer.connectionState = Connected
# We spawn task which will obtain ENR for this peer.
resolvePeer(peer)
peer.resolvePeer()
debug "Peer successfully connected", peer = peer,
connections = peer.connections

Expand Down Expand Up @@ -1290,10 +1330,8 @@ proc onConnEvent(node: Eth2Node, peerId: PeerID, event: ConnEvent) {.async.} =
# Whatever caused disconnection, avoid connection spamming
node.addSeen(peerId, SeenTableTimeReconnect)

let fut = peer.disconnectedFut
if not(isNil(fut)):
fut.complete()
peer.disconnectedFut = nil
if not(isNil(peer.disconnectedFut)):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't this check be inside notifyAndWait?

await node.notifyAndWait(peer)
else:
# TODO (cheatfate): This could be removed when bug will be fixed inside
# `nim-libp2p`.
Expand Down
47 changes: 43 additions & 4 deletions beacon_chain/networking/peer_pool.nim
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ type
peerType: PeerType
flags: set[PeerFlags]
index: int
future: Future[void]

PeerIndex = object
data: int
Expand Down Expand Up @@ -311,13 +312,16 @@ proc deletePeer*[A, B](pool: PeerPool[A, B], peer: A, force = false): bool =
dec(pool.curOutPeersCount)
dec(pool.acqOutPeersCount)

let fut = item[].future
# Indicate that we have an empty space
pool.fireNotFullEvent(item[])
# Cleanup storage with default item, and removing key from hashtable.
pool.storage[pindex] = PeerItem[A]()
pool.registry.del(key)
pool.peerDeleted(peer)
pool.peerCountChanged()
# Indicate that peer was deleted
fut.complete()
else:
if item[].peerType == PeerType.Incoming:
# If peer is available, then its copy present in heapqueue, so we need
Expand All @@ -336,24 +340,59 @@ proc deletePeer*[A, B](pool: PeerPool[A, B], peer: A, force = false): bool =
break
dec(pool.curOutPeersCount)

let fut = item[].future
# Indicate that we have an empty space
pool.fireNotFullEvent(item[])
# Cleanup storage with default item, and removing key from hashtable.
pool.storage[pindex] = PeerItem[A]()
pool.registry.del(key)
pool.peerDeleted(peer)
pool.peerCountChanged()
# Indicate that peer was deleted
fut.complete()
true
else:
false

proc joinPeer*[A, B](pool: PeerPool[A, B], peer: A): Future[void] =
## This procedure will only when peer ``peer`` finally leaves PeerPool
## ``pool``.
mixin getKey
var retFuture = newFuture[void]("peerpool.joinPeer")
var future: Future[void]

proc continuation(udata: pointer) {.gcsafe.} =
if not(retFuture.finished()):
retFuture.complete()

proc cancellation(udata: pointer) {.gcsafe.} =
if not(isNil(future)):
future.removeCallback(continuation)

let key = getKey(peer)
pool.registry.withValue(key, pindex):
var item = addr(pool.storage[pindex[].data])
future = item[].future
# If peer is still in PeerPool, then item[].future should not be finished.
doAssert(not(future.finished()))
future.addCallback(continuation)
retFuture.cancelCallback = cancellation
do:
# If there no such peer in PeerPool anymore, its possible that
# PeerItem.future's callbacks is not yet processed, so we going to complete
# retFuture only in next `poll()` call.
callSoon(continuation, cast[pointer](retFuture))
retFuture

proc addPeerImpl[A, B](pool: PeerPool[A, B], peer: A, peerKey: B,
peerType: PeerType) =
mixin getFuture
proc onPeerClosed(udata: pointer) {.gcsafe, raises: [Defect].} =
discard pool.deletePeer(peer)

let item = PeerItem[A](data: peer, peerType: peerType,
index: len(pool.storage))
index: len(pool.storage),
future: newFuture[void]("peerpool.peer"))
pool.storage.add(item)
var pitem = addr(pool.storage[^1])
let pindex = PeerIndex(data: item.index, cmp: pool.cmp)
Expand All @@ -377,13 +416,13 @@ proc checkPeer*[A, B](pool: PeerPool[A, B], peer: A): PeerStatus {.inline.} =
## * Peer's lifetime future is not finished yet - (PeerStatus.DeadPeerError)
##
## If peer could be added to PeerPool procedure returns (PeerStatus.Success)
mixin getKey, getFuture
mixin getKey, isAlive
if not(pool.checkPeerScore(peer)):
PeerStatus.LowScoreError
else:
let peerKey = getKey(peer)
if not(pool.registry.hasKey(peerKey)):
if not(peer.getFuture().finished):
if peer.isAlive():
PeerStatus.Success
else:
PeerStatus.DeadPeerError
Expand All @@ -403,7 +442,7 @@ proc addPeerNoWait*[A, B](pool: PeerPool[A, B],
## (PeerStatus.NoSpaceError)
##
## Procedure returns (PeerStatus.Success) on success.
mixin getKey, getFuture
mixin getKey
let res = pool.checkPeer(peer)
if res != PeerStatus.Success:
res
Expand Down
101 changes: 101 additions & 0 deletions tests/test_peer_pool.nim
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ func getFuture(peer: PeerTest): Future[void] =
func `<`(a, b: PeerTest): bool =
`<`(a.weight, b.weight)

proc isAlive*(peer: PeerTest): bool =
not(peer.future.finished())

proc init*(t: typedesc[PeerTest], id: string = "",
weight: int = 0): PeerTest =
PeerTest(id: id, weight: weight, future: newFuture[void]())
Expand Down Expand Up @@ -683,6 +686,104 @@ suite "PeerPool testing suite":

check waitFor(testDeleteOnRelease()) == true

test "Notify when peer leaves pool test":
proc testNotifyOnLeave(): Future[bool] {.async.} =
var pool = newPeerPool[PeerTest, PeerTestID]()
var peer0 = PeerTest.init("idInc0", 100)
var peer1 = PeerTest.init("idOut0", 100)
var peer2 = PeerTest.init("idInc1", 100)
var peer3 = PeerTest.init("idOut1", 100)

# Case 1. Deleting peer which is not part of PeerPool.
block:
var fut0 = pool.joinPeer(peer0)
doAssert(fut0.finished == false)
await sleepAsync(20.milliseconds)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can't we use the step polling here instead?

doAssert(fut0.finished == true and fut0.failed == false)

# Case 2. Deleting peer which is not acquired.
discard pool.addPeerNoWait(peer0, PeerType.Incoming)
block:
var fut0 = pool.joinPeer(peer0)
discard pool.deletePeer(peer0)
var fut1 = pool.joinPeer(peer0)
await sleepAsync(20.milliseconds)
var fut2 = pool.joinPeer(peer0)
doAssert(fut0.finished == true and fut0.failed == false)
doAssert(fut1.finished == true and fut1.failed == false)
doAssert(fut2.finished == false)
await sleepAsync(20.milliseconds)
doAssert(fut2.finished == true and fut2.failed == false)

# Case 3. Peer disconnected while it wasn't acquired.
discard pool.addPeerNoWait(peer1, PeerType.Outgoing)
block:
var fut0 = pool.joinPeer(peer1)
# Peer disconnecting
peer1.future.complete()
var fut1 = pool.joinPeer(peer1)
await sleepAsync(20.milliseconds)
var fut2 = pool.joinPeer(peer1)
doAssert(fut0.finished == true and fut0.failed == false)
doAssert(fut1.finished == true and fut1.failed == false)
doAssert(fut2.finished == false)
await sleepAsync(20.milliseconds)
doAssert(fut2.finished == true and fut2.failed == false)

# Case 4. Peer deleted when it was acquired.
discard pool.addPeerNoWait(peer2, PeerType.Incoming)
block:
var fut0 = pool.joinPeer(peer2)
var p = await pool.acquire()
doAssert(p.id == "idInc1")
var fut1 = pool.joinPeer(peer2)
discard pool.deletePeer(peer2)
await sleepAsync(20.milliseconds)
var fut2 = pool.joinPeer(peer2)
doAssert(fut0.finished == false)
doAssert(fut1.finished == false)
doAssert(fut2.finished == false)
pool.release(peer2)
var fut3 = pool.joinPeer(peer2)
await sleepAsync(20.milliseconds)
var fut4 = pool.joinPeer(peer2)
doAssert(fut0.finished == true and fut0.failed == false)
doAssert(fut1.finished == true and fut1.failed == false)
doAssert(fut2.finished == true and fut2.failed == false)
doAssert(fut3.finished == true and fut3.failed == false)
doAssert(fut4.finished == false)
await sleepAsync(20.milliseconds)
doAssert(fut4.finished == true and fut4.failed == false)

# Case 5. Peer disconnected while it was acquired.
discard pool.addPeerNoWait(peer3, PeerType.Outgoing)
block:
var fut0 = pool.joinPeer(peer3)
var p = await pool.acquire()
doAssert(p.id == "idOut1")
var fut1 = pool.joinPeer(peer3)
# Peer disconnecting
peer3.future.complete()
await sleepAsync(20.milliseconds)
var fut2 = pool.joinPeer(peer3)
doAssert(fut0.finished == false)
doAssert(fut1.finished == false)
doAssert(fut2.finished == false)
pool.release(peer3)
var fut3 = pool.joinPeer(peer3)
await sleepAsync(20.milliseconds)
var fut4 = pool.joinPeer(peer3)
doAssert(fut0.finished == true and fut0.failed == false)
doAssert(fut1.finished == true and fut1.failed == false)
doAssert(fut2.finished == true and fut2.failed == false)
doAssert(fut3.finished == true and fut3.failed == false)
doAssert(fut4.finished == false)
await sleepAsync(20.milliseconds)
doAssert(fut4.finished == true and fut4.failed == false)
return true

check waitFor(testNotifyOnLeave()) == true

test "Space tests":
var pool1 = newPeerPool[PeerTest, PeerTestID](maxPeers = 79)
var pool2 = newPeerPool[PeerTest, PeerTestID](maxPeers = 79,
Expand Down