Skip to content

Commit ee0a025

Browse files
committed
Add peerpool.joinPeer() and tests.
notifyAndWait() now waits PeerPool and disconnection.
1 parent 3c116da commit ee0a025

File tree

3 files changed

+154
-12
lines changed

3 files changed

+154
-12
lines changed

beacon_chain/eth2_network.nim

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -359,17 +359,16 @@ proc updateScore*(peer: Peer, score: int) {.inline.} =
359359
proc join*(peer: Peer): Future[void] =
360360
var retFuture = newFuture[void]("peer.lifetime.join")
361361
let peerFut = peer.getFuture()
362-
let alreadyFinished = peerFut.finished()
363362

364363
proc continuation(udata: pointer) {.gcsafe.} =
365364
if not(retFuture.finished()):
366365
retFuture.complete()
367366

368367
proc cancellation(udata: pointer) {.gcsafe.} =
369-
if not(alreadyFinished):
368+
if not(isNil(peerFut)):
370369
peerFut.removeCallback(continuation)
371370

372-
if alreadyFinished:
371+
if peerFut.finished():
373372
# All the `peer.disconnectedFut` callbacks are already scheduled in current
374373
# `poll()` call, to avoid race we going to finish only in next `poll()`
375374
# call.
@@ -378,18 +377,21 @@ proc join*(peer: Peer): Future[void] =
378377
# `peer.disconnectedFut` is not yet finished, but we want to be scheduled
379378
# after all callbacks.
380379
peerFut.addCallback(continuation)
380+
retFuture.cancelCallback = cancellation
381381

382-
return retFuture
382+
retFuture
383383

384-
proc notifyAndWait*(peer: Peer): Future[void] =
384+
proc notifyAndWait*(network: ETh2Node, peer: Peer): Future[void] =
385385
## Notify all the waiters that peer life is finished and wait until all
386386
## callbacks will be processed.
387-
let joinFut = peer.join()
388-
let fut = peer.disconnectedFut
387+
let
388+
joinFut = peer.join()
389+
poolFut = network.peerPool.joinPeer(peer)
390+
discFut = peer.disconnectedFut
389391
peer.connectionState = Disconnecting
390-
fut.complete()
392+
discFut.complete()
391393
peer.disconnectedFut = nil
392-
joinFut
394+
allFutures(joinFut, poolFut)
393395

394396
proc calcThroughput(dur: Duration, value: uint64): float =
395397
let secs = float(chronos.seconds(1).nanoseconds)
@@ -1087,7 +1089,7 @@ proc onConnEvent(node: Eth2Node, peerId: PeerID, event: ConnEvent) {.async.} =
10871089
node.addSeen(peerId, SeenTableTimeReconnect)
10881090

10891091
if not(isNil(peer.disconnectedFut)):
1090-
await peer.notifyAndWait()
1092+
await node.notifyAndWait(peer)
10911093
else:
10921094
# TODO (cheatfate): This could be removed when bug will be fixed inside
10931095
# `nim-libp2p`.

beacon_chain/peer_pool.nim

Lines changed: 41 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ type
2525
peerType: PeerType
2626
flags: set[PeerFlags]
2727
index: int
28+
future: Future[void]
2829

2930
PeerIndex = object
3031
data: int
@@ -301,13 +302,16 @@ proc deletePeer*[A, B](pool: PeerPool[A, B], peer: A, force = false): bool =
301302
dec(pool.curOutPeersCount)
302303
dec(pool.acqOutPeersCount)
303304

305+
let fut = item[].future
304306
# Indicate that we have an empty space
305307
pool.fireNotFullEvent(item[])
306308
# Cleanup storage with default item, and removing key from hashtable.
307309
pool.storage[pindex] = PeerItem[A]()
308310
pool.registry.del(key)
309311
pool.peerDeleted(peer)
310312
pool.peerCountChanged()
313+
# Indicate that peer was deleted
314+
fut.complete()
311315
else:
312316
if item[].peerType == PeerType.Incoming:
313317
# If peer is available, then its copy present in heapqueue, so we need
@@ -326,25 +330,61 @@ proc deletePeer*[A, B](pool: PeerPool[A, B], peer: A, force = false): bool =
326330
break
327331
dec(pool.curOutPeersCount)
328332

333+
let fut = item[].future
329334
# Indicate that we have an empty space
330335
pool.fireNotFullEvent(item[])
331336
# Cleanup storage with default item, and removing key from hashtable.
332337
pool.storage[pindex] = PeerItem[A]()
333338
pool.registry.del(key)
334339
pool.peerDeleted(peer)
335340
pool.peerCountChanged()
341+
# Indicate that peer was deleted
342+
fut.complete()
336343
true
337344
else:
338345
false
339346

347+
proc joinPeer*[A, B](pool: PeerPool[A, B], peer: A): Future[void] =
348+
## This procedure will only when peer ``peer`` finally leaves PeerPool
349+
## ``pool``.
350+
mixin getKey
351+
var retFuture = newFuture[void]("peerpool.joinPeer")
352+
var future: Future[void]
353+
354+
proc continuation(udata: pointer) {.gcsafe.} =
355+
if not(retFuture.finished()):
356+
retFuture.complete()
357+
358+
proc cancellation(udata: pointer) {.gcsafe.} =
359+
if not(isNil(future)):
360+
future.removeCallback(continuation)
361+
362+
let key = getKey(peer)
363+
if pool.registry.hasKey(key):
364+
let pindex = pool.registry[key].data
365+
var item = addr(pool.storage[pindex])
366+
future = item[].future
367+
# If peer is still in PeerPool, then item[].future should not be finished.
368+
doAssert(not(future.finished()))
369+
future.addCallback(continuation)
370+
retFuture.cancelCallback = cancellation
371+
else:
372+
# If there no such peer in PeerPool anymore, its possible that
373+
# PeerItem.future's callbacks is not yet processed, so we going to complete
374+
# retFuture only in next `poll()` call.
375+
callSoon(continuation, cast[pointer](retFuture))
376+
377+
retFuture
378+
340379
proc addPeerImpl[A, B](pool: PeerPool[A, B], peer: A, peerKey: B,
341380
peerType: PeerType) =
342381
mixin getFuture
343382
proc onPeerClosed(udata: pointer) {.gcsafe.} =
344383
discard pool.deletePeer(peer)
345384

346385
let item = PeerItem[A](data: peer, peerType: peerType,
347-
index: len(pool.storage))
386+
index: len(pool.storage),
387+
future: newFuture[void]("peerpool.peer"))
348388
pool.storage.add(item)
349389
var pitem = addr(pool.storage[^1])
350390
let pindex = PeerIndex(data: item.index, cmp: pool.cmp)

tests/test_peer_pool.nim

Lines changed: 101 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -652,7 +652,7 @@ suiteReport "PeerPool testing suite":
652652
lenAcquired(pool) == 0
653653
len(pool) == 0
654654

655-
timedTest "Delete peer on release text":
655+
timedTest "Delete peer on release test":
656656
proc testDeleteOnRelease(): Future[bool] {.async.} =
657657
proc scoreCheck(peer: PeerTest): bool =
658658
if peer.weight >= 0:
@@ -686,6 +686,106 @@ suiteReport "PeerPool testing suite":
686686

687687
check waitFor(testDeleteOnRelease()) == true
688688

689+
timedTest "Notify when peer leaves pool test":
690+
proc testNotifyOnLeave(): Future[bool] {.async.} =
691+
692+
var pool = newPeerPool[PeerTest, PeerTestID]()
693+
var peer0 = PeerTest.init("idInc0", 100)
694+
var peer1 = PeerTest.init("idOut0", 100)
695+
var peer2 = PeerTest.init("idInc1", 100)
696+
var peer3 = PeerTest.init("idOut1", 100)
697+
698+
# Case 1. Deleting peer which is not part of PeerPool.
699+
block:
700+
var fut0 = pool.joinPeer(peer0)
701+
doAssert(fut0.finished == false)
702+
await sleepAsync(20.milliseconds)
703+
doAssert(fut0.finished == true and fut0.failed == false)
704+
705+
# Case 2. Deleting peer which is not acquired.
706+
discard pool.addPeerNoWait(peer0, PeerType.Incoming)
707+
block:
708+
var fut0 = pool.joinPeer(peer0)
709+
discard pool.deletePeer(peer0)
710+
var fut1 = pool.joinPeer(peer0)
711+
await sleepAsync(20.milliseconds)
712+
var fut2 = pool.joinPeer(peer0)
713+
doAssert(fut0.finished == true and fut0.failed == false)
714+
doAssert(fut1.finished == true and fut1.failed == false)
715+
doAssert(fut2.finished == false)
716+
await sleepAsync(20.milliseconds)
717+
doAssert(fut2.finished == true and fut2.failed == false)
718+
719+
# Case 3. Peer disconnected while it wasn't acquired.
720+
discard pool.addPeerNoWait(peer1, PeerType.Outgoing)
721+
block:
722+
var fut0 = pool.joinPeer(peer1)
723+
# Peer disconnecting
724+
peer1.future.complete()
725+
var fut1 = pool.joinPeer(peer1)
726+
await sleepAsync(20.milliseconds)
727+
var fut2 = pool.joinPeer(peer1)
728+
doAssert(fut0.finished == true and fut0.failed == false)
729+
doAssert(fut1.finished == true and fut1.failed == false)
730+
doAssert(fut2.finished == false)
731+
await sleepAsync(20.milliseconds)
732+
doAssert(fut2.finished == true and fut2.failed == false)
733+
734+
# Case 4. Peer deleted when it was acquired.
735+
discard pool.addPeerNoWait(peer2, PeerType.Incoming)
736+
block:
737+
var fut0 = pool.joinPeer(peer2)
738+
var p = await pool.acquire()
739+
doAssert(p.id == "idInc1")
740+
var fut1 = pool.joinPeer(peer2)
741+
discard pool.deletePeer(peer2)
742+
await sleepAsync(20.milliseconds)
743+
var fut2 = pool.joinPeer(peer2)
744+
doAssert(fut0.finished == false)
745+
doAssert(fut1.finished == false)
746+
doAssert(fut2.finished == false)
747+
pool.release(peer2)
748+
var fut3 = pool.joinPeer(peer2)
749+
await sleepAsync(20.milliseconds)
750+
var fut4 = pool.joinPeer(peer2)
751+
doAssert(fut0.finished == true and fut0.failed == false)
752+
doAssert(fut1.finished == true and fut1.failed == false)
753+
doAssert(fut2.finished == true and fut2.failed == false)
754+
doAssert(fut3.finished == true and fut3.failed == false)
755+
doAssert(fut4.finished == false)
756+
await sleepAsync(20.milliseconds)
757+
doAssert(fut4.finished == true and fut4.failed == false)
758+
759+
# Case 5. Peer disconnected while it was acquired.
760+
discard pool.addPeerNoWait(peer3, PeerType.Outgoing)
761+
block:
762+
var fut0 = pool.joinPeer(peer3)
763+
var p = await pool.acquire()
764+
doAssert(p.id == "idOut1")
765+
var fut1 = pool.joinPeer(peer3)
766+
# Peer disconnecting
767+
peer3.future.complete()
768+
await sleepAsync(20.milliseconds)
769+
var fut2 = pool.joinPeer(peer3)
770+
doAssert(fut0.finished == false)
771+
doAssert(fut1.finished == false)
772+
doAssert(fut2.finished == false)
773+
pool.release(peer3)
774+
var fut3 = pool.joinPeer(peer3)
775+
await sleepAsync(20.milliseconds)
776+
var fut4 = pool.joinPeer(peer3)
777+
doAssert(fut0.finished == true and fut0.failed == false)
778+
doAssert(fut1.finished == true and fut1.failed == false)
779+
doAssert(fut2.finished == true and fut2.failed == false)
780+
doAssert(fut3.finished == true and fut3.failed == false)
781+
doAssert(fut4.finished == false)
782+
await sleepAsync(20.milliseconds)
783+
doAssert(fut4.finished == true and fut4.failed == false)
784+
785+
result = true
786+
787+
check waitFor(testNotifyOnLeave()) == true
788+
689789
timedTest "Space tests":
690790
var pool1 = newPeerPool[PeerTest, PeerTestID](maxPeers = 79)
691791
var pool2 = newPeerPool[PeerTest, PeerTestID](maxPeers = 79,

0 commit comments

Comments
 (0)