Skip to content

Commit 443da8e

Browse files
authored
Support multiple wire protocol version and implement eth/68 + eth/69 (#3283)
* Remove old wire protocol implementation * eth68 status isolated * eth69 preparation * Fix typo * Register protocol * Add BlockRangeUpdate * Use new receipt format for eth69 * Fix tests * Update wire protocol setup * Update syncer addObserver * Update peer observer * Handle blockRangeUpdate using peer state * Add receipt69 roundtrip test * Replace Receipt69 with StoredReceipt from nim-eth * Bump nim-eth * Bump nim-eth to master branch
1 parent 152f3cd commit 443da8e

File tree

17 files changed

+409
-425
lines changed

17 files changed

+409
-425
lines changed

execution_chain/networking/p2p_types.nim

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ type
7878
PeerObserver* = object
7979
onPeerConnected*: proc(p: Peer) {.gcsafe, raises: [].}
8080
onPeerDisconnected*: proc(p: Peer) {.gcsafe, raises: [].}
81-
protocol*: ProtocolInfo
81+
protocols*: seq[ProtocolInfo]
8282

8383
Capability* = object
8484
name*: string

execution_chain/networking/peer_pool.nim

Lines changed: 7 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ proc addObserver*(p: PeerPool, observerId: int, observer: PeerObserver) =
8787
p.observers[observerId] = observer
8888
if not observer.onPeerConnected.isNil:
8989
for peer in p.connectedNodes.values:
90-
if observer.protocol.isNil or peer.supports(observer.protocol):
90+
if observer.protocols.len == 0 or peer.supports(observer.protocols):
9191
observer.onPeerConnected(peer)
9292
9393
func delObserver*(p: PeerPool, observerId: int) =
@@ -99,19 +99,15 @@ proc addObserver*(p: PeerPool, observerId: ref, observer: PeerObserver) =
9999
func delObserver*(p: PeerPool, observerId: ref) =
100100
p.delObserver(cast[int](observerId))
101101
102-
template setProtocol*(observer: PeerObserver, Protocol: type) =
103-
observer.protocol = Protocol.protocolInfo
102+
template addProtocol*(observer: PeerObserver, Protocol: type) =
103+
observer.protocols.add Protocol.protocolInfo
104104
105105
proc stopAllPeers(p: PeerPool) {.async.} =
106106
debug "Stopping all peers ..."
107107
# TODO: ...
108108
# await asyncio.gather(
109109
# *[peer.stop() for peer in self.connected_nodes.values()])
110110

111-
# async def stop(self) -> None:
112-
# self.cancel_token.trigger()
113-
# await self.stop_all_peers()
114-
115111
proc connect(p: PeerPool, remote: Node): Future[Peer] {.async.} =
116112
## Connect to the given remote and return a Peer instance when successful.
117113
## Returns nil if the remote is unreachable, times out or is useless.
@@ -166,10 +162,10 @@ proc addPeer*(pool: PeerPool, peer: Peer) {.gcsafe.} =
166162
doAssert(peer.remote notin pool.connectedNodes)
167163
pool.connectedNodes[peer.remote] = peer
168164
rlpx_connected_peers.inc()
169-
for o in pool.observers.values:
170-
if not o.onPeerConnected.isNil:
171-
if o.protocol.isNil or peer.supports(o.protocol):
172-
o.onPeerConnected(peer)
165+
for observer in pool.observers.values:
166+
if not observer.onPeerConnected.isNil:
167+
if observer.protocols.len == 0 or peer.supports(observer.protocols):
168+
observer.onPeerConnected(peer)
173169

174170
proc connectToNode*(p: PeerPool, n: Node) {.async.} =
175171
let peer = await p.connect(n)

execution_chain/networking/rlpx.nim

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -365,6 +365,11 @@ proc supports*(peer: Peer, Protocol: type): bool =
365365
## Checks whether a Peer supports a particular protocol
366366
peer.supports(Protocol.protocolInfo)
367367
368+
proc supports*(peer: Peer, protos: openArray[ProtocolInfo]): bool =
369+
for proto in protos:
370+
if peer.supports(proto):
371+
return true
372+
368373
template perPeerMsgId(peer: Peer, MsgType: type): uint64 =
369374
perPeerMsgIdImpl(peer, MsgType.msgProtocol.protocolInfo, MsgType.msgId)
370375
@@ -981,7 +986,7 @@ proc removePeer(network: EthereumNode, peer: Peer) =
981986
if not peer.dispatcher.isNil:
982987
for observer in network.peerPool.observers.values:
983988
if not observer.onPeerDisconnected.isNil:
984-
if observer.protocol.isNil or peer.supports(observer.protocol):
989+
if observer.protocols.len == 0 or peer.supports(observer.protocols):
985990
observer.onPeerDisconnected(peer)
986991

987992
proc callDisconnectHandlers(
@@ -1557,6 +1562,26 @@ template rlpxWithFutureHandler*(PROTO: distinct type;
15571562
resolveResponseFuture(peer,
15581563
perPeerMsgId, addr(packet), reqId)
15591564

1565+
template rlpxWithFutureHandler*(PROTO: distinct type;
1566+
MSGTYPE: distinct type;
1567+
PROTYPE: distinct type;
1568+
msgId: static[uint64];
1569+
peer: Peer;
1570+
data: Rlp,
1571+
fields: untyped): untyped =
1572+
wrapRlpxWithPacketException(MSGTYPE, peer):
1573+
var
1574+
rlp = data
1575+
packet: MSGTYPE
1576+
1577+
tryEnterList(rlp)
1578+
let
1579+
reqId = read(rlp, uint64)
1580+
perPeerMsgId = msgIdImpl(PROTO, peer, msgId)
1581+
checkedRlpFields(peer, rlp, packet, fields)
1582+
var proType = packet.to(PROTYPE)
1583+
resolveResponseFuture(peer,
1584+
perPeerMsgId, addr(proType), reqId)
15601585

15611586
proc nextMsg*(PROTO: distinct type,
15621587
peer: Peer,
@@ -1595,6 +1620,9 @@ func initResponder*(peer: Peer, reqId: uint64): Responder =
15951620
template state*(response: Responder, PROTO: type): auto =
15961621
state(response.peer, PROTO)
15971622

1623+
template supports*(response: Responder, Protocol: type): bool =
1624+
response.peer.supports(Protocol.protocolInfo)
1625+
15981626
template networkState*(response: Responder, PROTO: type): auto =
15991627
networkState(response.peer, PROTO)
16001628

@@ -1618,6 +1646,9 @@ template defineProtocol*(PROTO: untyped,
16181646
template NetworkState*(_: type PROTO): type =
16191647
networkState
16201648

1649+
template protocolVersion*(_: type PROTO): int =
1650+
version
1651+
16211652
func initProtocol*(_: type PROTO): auto =
16221653
initProtocol(rlpxName,
16231654
version,

execution_chain/sync/beacon/worker/start_stop.nim

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -94,8 +94,15 @@ proc startBuddy*(buddy: BeaconBuddyRef): bool =
9494
let
9595
ctx = buddy.ctx
9696
peer = buddy.peer
97-
if peer.supports(wire_protocol.eth) and
98-
peer.state(wire_protocol.eth).initialized:
97+
98+
if peer.supports(eth69) and
99+
peer.state(eth69).initialized:
100+
ctx.pool.nBuddies.inc
101+
buddy.initHdrProcErrors()
102+
return true
103+
104+
if peer.supports(eth68) and
105+
peer.state(eth68).initialized:
99106
ctx.pool.nBuddies.inc
100107
ctx.pool.blkLastSlowPeer = Opt.none(Hash)
101108
buddy.initHdrProcErrors()

execution_chain/sync/peers.nim

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,8 @@ proc setupManager(pm: PeerManagerRef, enodes: openArray[ENode]) =
9292
pm.state = Running
9393
pm.reconnectFut = pm.runReconnectLoop()
9494

95-
po.setProtocol eth
95+
po.addProtocol eth68
96+
po.addProtocol eth69
9697
pm.pool.addObserver(pm, po)
9798

9899
for enode in enodes:

execution_chain/sync/sync_sched.nim

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,8 @@ import
8888
chronos,
8989
../networking/[p2p, peer_pool],
9090
stew/keyed_queue,
91-
./sync_desc
91+
./sync_desc,
92+
./wire_protocol
9293

9394
type
9495
ActiveBuddies[S,W] = ##\
@@ -503,8 +504,10 @@ proc startSync*[S,W](dsc: RunnerSyncRef[S,W]): bool =
503504
onPeerDisconnected: proc(p: Peer) {.gcsafe.} =
504505
dsc.onPeerDisconnected(p))
505506
506-
po.setProtocol eth
507+
po.addProtocol eth68
508+
po.addProtocol eth69
507509
dsc.pool.addObserver(dsc, po)
510+
508511
asyncSpawn dsc.tickerLoop()
509512
return true
510513

execution_chain/sync/wire_protocol.nim

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,3 @@ export
1818
responder,
1919
types,
2020
setup
21-
22-
type
23-
eth* = eth68

execution_chain/sync/wire_protocol/handler.nim

Lines changed: 40 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -44,14 +44,14 @@ proc new*(_: type EthWireRef,
4444
# Public functions: eth wire protocol handlers
4545
# ------------------------------------------------------------------------------
4646

47-
proc getStatus*(ctx: EthWireRef): EthState =
47+
proc getStatus68*(ctx: EthWireRef): Eth68State =
4848
let
4949
com = ctx.chain.com
5050
bestBlock = ctx.chain.latestHeader
5151
txFrame = ctx.chain.baseTxFrame
5252
forkId = com.forkId(bestBlock.number, bestBlock.timestamp)
5353

54-
EthState(
54+
Eth68State(
5555
totalDifficulty: txFrame.headTotalDifficulty,
5656
genesisHash: com.genesisHash,
5757
bestBlockHash: bestBlock.computeBlockHash,
@@ -60,6 +60,23 @@ proc getStatus*(ctx: EthWireRef): EthState =
6060
forkNext: forkId.nextFork
6161
))
6262

63+
proc getStatus69*(ctx: EthWireRef): Eth69State =
64+
let
65+
com = ctx.chain.com
66+
bestBlock = ctx.chain.latestHeader
67+
forkId = com.forkId(bestBlock.number, bestBlock.timestamp)
68+
69+
Eth69State(
70+
genesisHash: com.genesisHash,
71+
forkId: ChainForkId(
72+
forkHash: forkId.crc.toBytesBE,
73+
forkNext: forkId.nextFork
74+
),
75+
earliest: 0,
76+
latest: bestBlock.number,
77+
latestHash: bestBlock.computeBlockHash,
78+
)
79+
6380
proc getReceipts*(ctx: EthWireRef,
6481
hashes: openArray[Hash32]):
6582
seq[seq[Receipt]] =
@@ -80,6 +97,26 @@ proc getReceipts*(ctx: EthWireRef,
8097

8198
move(list)
8299

100+
proc getStoredReceipts*(ctx: EthWireRef,
101+
hashes: openArray[Hash32]):
102+
seq[seq[StoredReceipt]] =
103+
var
104+
list: seq[seq[StoredReceipt]]
105+
totalBytes = 0
106+
107+
for blockHash in hashes:
108+
var receiptList = ctx.chain.receiptsByBlockHash(blockHash).valueOr:
109+
continue
110+
111+
totalBytes += getEncodedLength(receiptList)
112+
list.add(receiptList.to(seq[StoredReceipt]))
113+
114+
if list.len >= MAX_RECEIPTS_SERVE or
115+
totalBytes > SOFT_RESPONSE_LIMIT:
116+
break
117+
118+
move(list)
119+
83120
proc getPooledTransactions*(ctx: EthWireRef,
84121
hashes: openArray[Hash32]):
85122
seq[PooledTransaction] =
@@ -127,7 +164,7 @@ proc getBlockBodies*(ctx: EthWireRef,
127164
if blk.header.number > ctx.chain.portal.limit:
128165
trace "handlers.getBlockBodies: blockBody older than expiry limit", blockHash
129166
continue
130-
167+
131168
totalBytes += getEncodedLength(blk.body)
132169
list.add blk.body
133170

0 commit comments

Comments
 (0)