-
Notifications
You must be signed in to change notification settings - Fork 293
Fix disconnect race. #2088
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
cheatfate
wants to merge
4
commits into
unstable
Choose a base branch
from
disconnect-race-fix
base: unstable
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Fix disconnect race. #2088
Changes from all commits
Commits
Show all changes
4 commits
Select commit
Hold shift + click to select a range
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -356,6 +356,9 @@ proc getFuture*(peer: Peer): Future[void] {.inline.} = | |
peer.disconnectedFut = newFuture[void]("Peer.disconnectedFut") | ||
peer.disconnectedFut | ||
|
||
proc isAlive*(peer: Peer): bool = | ||
peer.connectionState notin {Disconnecting, Disconnected} | ||
|
||
proc getScore*(a: Peer): int = | ||
## Returns current score value for peer ``peer``. | ||
a.score | ||
|
@@ -366,6 +369,43 @@ proc updateScore*(peer: Peer, score: int) {.inline.} = | |
if peer.score > PeerScoreHighLimit: | ||
peer.score = PeerScoreHighLimit | ||
|
||
proc join*(peer: Peer): Future[void] = | ||
var retFuture = newFuture[void]("peer.lifetime.join") | ||
let peerFut = peer.getFuture() | ||
|
||
proc continuation(udata: pointer) {.gcsafe.} = | ||
if not(retFuture.finished()): | ||
retFuture.complete() | ||
|
||
proc cancellation(udata: pointer) {.gcsafe.} = | ||
if not(isNil(peerFut)): | ||
peerFut.removeCallback(continuation) | ||
|
||
if peerFut.finished(): | ||
# All the `peer.disconnectedFut` callbacks are already scheduled in current | ||
# `poll()` call, to avoid race we going to finish only in next `poll()` | ||
# call. | ||
callSoon(continuation, cast[pointer](retFuture)) | ||
else: | ||
# `peer.disconnectedFut` is not yet finished, but we want to be scheduled | ||
# after all callbacks. | ||
peerFut.addCallback(continuation) | ||
retFuture.cancelCallback = cancellation | ||
|
||
retFuture | ||
|
||
proc notifyAndWait*(network: ETh2Node, peer: Peer): Future[void] = | ||
## Notify all the waiters that peer life is finished and wait until all | ||
## callbacks will be processed. | ||
let | ||
joinFut = peer.join() | ||
poolFut = network.peerPool.joinPeer(peer) | ||
discFut = peer.disconnectedFut | ||
peer.connectionState = Disconnecting | ||
discFut.complete() | ||
peer.disconnectedFut = nil | ||
allFutures(joinFut, poolFut) | ||
|
||
proc calcThroughput(dur: Duration, value: uint64): float = | ||
let secs = float(chronos.seconds(1).nanoseconds) | ||
if isZero(dur): | ||
|
@@ -1183,8 +1223,6 @@ proc resolvePeer(peer: Peer) = | |
discard peer.peerId.extractPublicKey(key) | ||
keys.PublicKey.fromRaw(key.skkey.getBytes()).get().toNodeId() | ||
|
||
debug "Peer's ENR recovery task started", node_id = $nodeId | ||
|
||
# This is "fast-path" for peers which was dialed. In this case discovery | ||
# already has most recent ENR information about this peer. | ||
let gnode = peer.network.discovery.getNode(nodeId) | ||
|
@@ -1194,6 +1232,9 @@ proc resolvePeer(peer: Peer) = | |
let delay = now(chronos.Moment) - startTime | ||
nbc_resolve_time.observe(delay.toFloatSeconds()) | ||
debug "Peer's ENR recovered", delay | ||
else: | ||
inc(nbc_failed_discoveries) | ||
debug "Peer's ENR could not be recovered" | ||
|
||
proc handlePeer*(peer: Peer) {.async.} = | ||
let res = peer.network.peerPool.addPeerNoWait(peer, peer.direction) | ||
|
@@ -1222,8 +1263,7 @@ proc handlePeer*(peer: Peer) {.async.} = | |
# Peer was added to PeerPool. | ||
peer.score = NewPeerScore | ||
peer.connectionState = Connected | ||
# We spawn task which will obtain ENR for this peer. | ||
resolvePeer(peer) | ||
peer.resolvePeer() | ||
debug "Peer successfully connected", peer = peer, | ||
connections = peer.connections | ||
|
||
|
@@ -1290,10 +1330,8 @@ proc onConnEvent(node: Eth2Node, peerId: PeerID, event: ConnEvent) {.async.} = | |
# Whatever caused disconnection, avoid connection spamming | ||
node.addSeen(peerId, SeenTableTimeReconnect) | ||
|
||
let fut = peer.disconnectedFut | ||
if not(isNil(fut)): | ||
fut.complete() | ||
peer.disconnectedFut = nil | ||
if not(isNil(peer.disconnectedFut)): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. shouldn't this check be inside |
||
await node.notifyAndWait(peer) | ||
else: | ||
# TODO (cheatfate): This could be removed when bug will be fixed inside | ||
# `nim-libp2p`. | ||
|
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -28,6 +28,9 @@ func getFuture(peer: PeerTest): Future[void] = | |
func `<`(a, b: PeerTest): bool = | ||
`<`(a.weight, b.weight) | ||
|
||
proc isAlive*(peer: PeerTest): bool = | ||
not(peer.future.finished()) | ||
|
||
proc init*(t: typedesc[PeerTest], id: string = "", | ||
weight: int = 0): PeerTest = | ||
PeerTest(id: id, weight: weight, future: newFuture[void]()) | ||
|
@@ -683,6 +686,104 @@ suite "PeerPool testing suite": | |
|
||
check waitFor(testDeleteOnRelease()) == true | ||
|
||
test "Notify when peer leaves pool test": | ||
proc testNotifyOnLeave(): Future[bool] {.async.} = | ||
var pool = newPeerPool[PeerTest, PeerTestID]() | ||
var peer0 = PeerTest.init("idInc0", 100) | ||
var peer1 = PeerTest.init("idOut0", 100) | ||
var peer2 = PeerTest.init("idInc1", 100) | ||
var peer3 = PeerTest.init("idOut1", 100) | ||
|
||
# Case 1. Deleting peer which is not part of PeerPool. | ||
block: | ||
var fut0 = pool.joinPeer(peer0) | ||
doAssert(fut0.finished == false) | ||
await sleepAsync(20.milliseconds) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can't we use the step polling here instead? |
||
doAssert(fut0.finished == true and fut0.failed == false) | ||
|
||
# Case 2. Deleting peer which is not acquired. | ||
discard pool.addPeerNoWait(peer0, PeerType.Incoming) | ||
block: | ||
var fut0 = pool.joinPeer(peer0) | ||
discard pool.deletePeer(peer0) | ||
var fut1 = pool.joinPeer(peer0) | ||
await sleepAsync(20.milliseconds) | ||
var fut2 = pool.joinPeer(peer0) | ||
doAssert(fut0.finished == true and fut0.failed == false) | ||
doAssert(fut1.finished == true and fut1.failed == false) | ||
doAssert(fut2.finished == false) | ||
await sleepAsync(20.milliseconds) | ||
doAssert(fut2.finished == true and fut2.failed == false) | ||
|
||
# Case 3. Peer disconnected while it wasn't acquired. | ||
discard pool.addPeerNoWait(peer1, PeerType.Outgoing) | ||
block: | ||
var fut0 = pool.joinPeer(peer1) | ||
# Peer disconnecting | ||
peer1.future.complete() | ||
var fut1 = pool.joinPeer(peer1) | ||
await sleepAsync(20.milliseconds) | ||
var fut2 = pool.joinPeer(peer1) | ||
doAssert(fut0.finished == true and fut0.failed == false) | ||
doAssert(fut1.finished == true and fut1.failed == false) | ||
doAssert(fut2.finished == false) | ||
await sleepAsync(20.milliseconds) | ||
doAssert(fut2.finished == true and fut2.failed == false) | ||
|
||
# Case 4. Peer deleted when it was acquired. | ||
discard pool.addPeerNoWait(peer2, PeerType.Incoming) | ||
block: | ||
var fut0 = pool.joinPeer(peer2) | ||
var p = await pool.acquire() | ||
doAssert(p.id == "idInc1") | ||
var fut1 = pool.joinPeer(peer2) | ||
discard pool.deletePeer(peer2) | ||
await sleepAsync(20.milliseconds) | ||
var fut2 = pool.joinPeer(peer2) | ||
doAssert(fut0.finished == false) | ||
doAssert(fut1.finished == false) | ||
doAssert(fut2.finished == false) | ||
pool.release(peer2) | ||
var fut3 = pool.joinPeer(peer2) | ||
await sleepAsync(20.milliseconds) | ||
var fut4 = pool.joinPeer(peer2) | ||
doAssert(fut0.finished == true and fut0.failed == false) | ||
doAssert(fut1.finished == true and fut1.failed == false) | ||
doAssert(fut2.finished == true and fut2.failed == false) | ||
doAssert(fut3.finished == true and fut3.failed == false) | ||
doAssert(fut4.finished == false) | ||
await sleepAsync(20.milliseconds) | ||
doAssert(fut4.finished == true and fut4.failed == false) | ||
|
||
# Case 5. Peer disconnected while it was acquired. | ||
discard pool.addPeerNoWait(peer3, PeerType.Outgoing) | ||
block: | ||
var fut0 = pool.joinPeer(peer3) | ||
var p = await pool.acquire() | ||
doAssert(p.id == "idOut1") | ||
var fut1 = pool.joinPeer(peer3) | ||
# Peer disconnecting | ||
peer3.future.complete() | ||
await sleepAsync(20.milliseconds) | ||
var fut2 = pool.joinPeer(peer3) | ||
doAssert(fut0.finished == false) | ||
doAssert(fut1.finished == false) | ||
doAssert(fut2.finished == false) | ||
pool.release(peer3) | ||
var fut3 = pool.joinPeer(peer3) | ||
await sleepAsync(20.milliseconds) | ||
var fut4 = pool.joinPeer(peer3) | ||
doAssert(fut0.finished == true and fut0.failed == false) | ||
doAssert(fut1.finished == true and fut1.failed == false) | ||
doAssert(fut2.finished == true and fut2.failed == false) | ||
doAssert(fut3.finished == true and fut3.failed == false) | ||
doAssert(fut4.finished == false) | ||
await sleepAsync(20.milliseconds) | ||
doAssert(fut4.finished == true and fut4.failed == false) | ||
return true | ||
|
||
check waitFor(testNotifyOnLeave()) == true | ||
|
||
test "Space tests": | ||
var pool1 = newPeerPool[PeerTest, PeerTestID](maxPeers = 79) | ||
var pool2 = newPeerPool[PeerTest, PeerTestID](maxPeers = 79, | ||
|
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
continuation does not use its
udata
pointer - why include a pointer here?