Skip to content

Commit 24171c3

Browse files
committed
Quick and dirty PoC for syncing from Portal history network
This PR is not intended to get merged. It is a very quick and dirty implementation with the intention of testing the Portal network and Fluffy code and to verify how long downloads of blocks take in comparison with the execution of them. It kind of abuses the current import from era code to do this. I think (?) in an improved version the block downloads should probably lead the implementation and trigger execution (it is a bit the reverse right now, which makes sense for era files). Perhaps that way the execution could even be offloaded to another thread? It is also coded without using the JSON-RPC API, as I found that easier for a quick version. But the getBlock call could be changed to use the json-rpc alternative.
1 parent f033a40 commit 24171c3

File tree

3 files changed

+211
-10
lines changed

3 files changed

+211
-10
lines changed

nimbus/config.nim

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -559,12 +559,19 @@ type
559559
defaultValue: false
560560
name: "debug-store-slot-hashes".}: bool
561561

562+
usePortal* {.
563+
hidden
564+
desc: "Use portal network instead of era files"
565+
defaultValue: false
566+
name: "debug-use-portal".}: bool
567+
562568
of `import-rlp`:
563569
blocksFile* {.
564570
argument
565571
desc: "One or more RLP encoded block(s) files"
566572
name: "blocks-file" }: seq[InputFile]
567573

574+
568575
func parseCmdArg(T: type NetworkId, p: string): T
569576
{.gcsafe, raises: [ValueError].} =
570577
parseBiggestUInt(p).T

nimbus/nimbus_execution_client.nim

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -256,7 +256,7 @@ proc run(nimbus: NimbusNode, conf: NimbusConf) =
256256

257257
case conf.cmd
258258
of NimbusCmd.`import`:
259-
importBlocks(conf, com)
259+
importBlocksPortal(conf, com)
260260
of NimbusCmd.`import-rlp`:
261261
importRlpBlocks(conf, com)
262262
else:

nimbus/nimbus_import.nim

Lines changed: 203 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -13,15 +13,25 @@ import
1313
chronicles,
1414
metrics,
1515
chronos/timer,
16-
std/[strformat, strutils],
16+
chronos,
17+
std/[strformat, strutils, os],
1718
stew/io2,
1819
beacon_chain/era_db,
1920
beacon_chain/networking/network_metadata,
2021
./config,
2122
./common/common,
2223
./core/chain,
2324
./db/era1_db,
24-
./utils/era_helpers
25+
./utils/era_helpers,
26+
eth/common/keys, # rng
27+
eth/net/nat, # setupAddress
28+
eth/p2p/discoveryv5/protocol as discv5_protocol,
29+
eth/p2p/discoveryv5/routing_table,
30+
eth/p2p/discoveryv5/enr,
31+
../fluffy/portal_node,
32+
../fluffy/common/common_utils, # getPersistentNetKey, getPersistentEnr
33+
../fluffy/network_metadata,
34+
../fluffy/version
2535

2636
declareGauge nec_import_block_number, "Latest imported block number"
2737

@@ -87,7 +97,164 @@ template boolFlag(flags, b): PersistBlockFlags =
8797
else:
8898
{}
8999

90-
proc importBlocks*(conf: NimbusConf, com: CommonRef) =
100+
proc run(config: NimbusConf): PortalNode {.
101+
raises: [CatchableError]
102+
.} =
103+
let rng = newRng()
104+
105+
## Network configuration
106+
let
107+
bindIp = config.listenAddress
108+
udpPort = Port(config.udpPort)
109+
# TODO: allow for no TCP port mapping!
110+
(extIp, _, extUdpPort) =
111+
try:
112+
setupAddress(config.nat, config.listenAddress, udpPort, udpPort, "portal")
113+
except CatchableError as exc:
114+
raiseAssert exc.msg
115+
# raise exc # TODO: Ideally we don't have the Exception here
116+
except Exception as exc:
117+
raiseAssert exc.msg
118+
(netkey, newNetKey) =
119+
# if config.netKey.isSome():
120+
# (config.netKey.get(), true)
121+
# else:
122+
getPersistentNetKey(rng[], config.dataDir / "netkey")
123+
124+
enrFilePath = config.dataDir / "nimbus_portal_node.enr"
125+
previousEnr =
126+
if not newNetKey:
127+
getPersistentEnr(enrFilePath)
128+
else:
129+
Opt.none(enr.Record)
130+
131+
var bootstrapRecords: seq[Record]
132+
# loadBootstrapFile(string config.bootstrapNodesFile, bootstrapRecords)
133+
# bootstrapRecords.add(config.bootstrapNodes)
134+
135+
# case config.network
136+
# of PortalNetwork.none:
137+
# discard # don't connect to any network bootstrap nodes
138+
# of PortalNetwork.mainnet:
139+
# for enrURI in mainnetBootstrapNodes:
140+
# let res = enr.Record.fromURI(enrURI)
141+
# if res.isOk():
142+
# bootstrapRecords.add(res.value)
143+
# of PortalNetwork.angelfood:
144+
# for enrURI in angelfoodBootstrapNodes:
145+
# let res = enr.Record.fromURI(enrURI)
146+
# if res.isOk():
147+
# bootstrapRecords.add(res.value)
148+
149+
# Only mainnet
150+
for enrURI in mainnetBootstrapNodes:
151+
let res = enr.Record.fromURI(enrURI)
152+
if res.isOk():
153+
bootstrapRecords.add(res.value)
154+
155+
## Discovery v5 protocol setup
156+
let
157+
discoveryConfig =
158+
DiscoveryConfig.init(DefaultTableIpLimit, DefaultBucketIpLimit, DefaultBitsPerHop)
159+
d = newProtocol(
160+
netkey,
161+
extIp,
162+
Opt.none(Port),
163+
extUdpPort,
164+
# Note: The addition of default clientInfo to the ENR is a temporary
165+
# measure to easily identify & debug the clients used in the testnet.
166+
# Might make this into a, default off, cli option.
167+
localEnrFields = {"c": enrClientInfoShort},
168+
bootstrapRecords = bootstrapRecords,
169+
previousRecord = previousEnr,
170+
bindIp = bindIp,
171+
bindPort = udpPort,
172+
enrAutoUpdate = true,
173+
config = discoveryConfig,
174+
rng = rng,
175+
)
176+
177+
d.open()
178+
179+
## Portal node setup
180+
let
181+
portalProtocolConfig = PortalProtocolConfig.init(
182+
DefaultTableIpLimit, DefaultBucketIpLimit, DefaultBitsPerHop, defaultAlpha, RadiusConfig(kind: Static, logRadius: 249),
183+
defaultDisablePoke, defaultMaxGossipNodes, defaultContentCacheSize,
184+
defaultDisableContentCache, defaultMaxConcurrentOffers
185+
)
186+
187+
portalNodeConfig = PortalNodeConfig(
188+
accumulatorFile: Opt.none(string),
189+
disableStateRootValidation: true,
190+
trustedBlockRoot: Opt.none(Digest),
191+
portalConfig: portalProtocolConfig,
192+
dataDir: string config.dataDir,
193+
storageCapacity: 0,
194+
contentRequestRetries: 1
195+
)
196+
197+
node = PortalNode.new(
198+
PortalNetwork.mainnet,
199+
portalNodeConfig,
200+
d,
201+
{PortalSubnetwork.history},
202+
bootstrapRecords = bootstrapRecords,
203+
rng = rng,
204+
)
205+
206+
let enrFile = config.dataDir / "nimbus_portal_node.enr"
207+
if io2.writeFile(enrFile, d.localNode.record.toURI()).isErr:
208+
fatal "Failed to write the enr file", file = enrFile
209+
quit 1
210+
211+
## Start the Portal node.
212+
node.start()
213+
214+
node
215+
216+
proc getBlockLoop(node: PortalNode, blockQueue: AsyncQueue[seq[EthBlock]], startBlock: uint64): Future[void] {.async.} =
217+
let historyNetwork = node.historyNetwork.value()
218+
var blockNumber = startBlock
219+
220+
let blockNumberQueue = newAsyncQueue[(uint64, uint64)](2048)
221+
var blocks: seq[EthBlock] = newSeq[EthBlock](8192)
222+
var count = 0
223+
224+
proc blockWorker(node: PortalNode): Future[void] {.async.} =
225+
while true:
226+
let (blockNumber, i) = await blockNumberQueue.popFirst()
227+
while true:
228+
let (header, body) = (await historyNetwork.getBlock(blockNumber + i)).valueOr:
229+
warn "Failed to get block", blockNumber = blockNumber + i
230+
# Note: loop will get stuck here if a block is not available
231+
continue
232+
233+
blocks[i] = init(EthBlock, header, body)
234+
count.inc()
235+
236+
break
237+
238+
var workers: seq[Future[void]] = @[]
239+
for i in 0 ..< 1024:
240+
workers.add node.blockWorker()
241+
242+
while true:
243+
blocks = newSeq[EthBlock](8192)
244+
count = 0
245+
info "Downloading 8192 blocks", startBlock = blockNumber
246+
for i in 0..8191'u64:
247+
await blockNumberQueue.addLast((blockNumber, i))
248+
249+
# Not great :/
250+
while count != 8192:
251+
await sleepAsync(10.milliseconds)
252+
info "Finished downloading 8192 blocks", startBlock = blockNumber
253+
await blockQueue.addLast(blocks)
254+
255+
blockNumber += 8192
256+
257+
proc importBlocks*(conf: NimbusConf, com: CommonRef, node: PortalNode, blockQueue: AsyncQueue[seq[EthBlock]]) {.async.} =
91258
proc controlCHandler() {.noconv.} =
92259
when defined(windows):
93260
# workaround for https://github.com/nim-lang/Nim/issues/4057
@@ -119,7 +286,7 @@ proc importBlocks*(conf: NimbusConf, com: CommonRef) =
119286
boolFlag(NoPersistBodies, not conf.storeBodies) +
120287
boolFlag({PersistBlockFlag.NoPersistReceipts}, not conf.storeReceipts) +
121288
boolFlag({PersistBlockFlag.NoPersistSlotHashes}, not conf.storeSlotHashes)
122-
blk: Block
289+
blk: blocks.Block
123290
persister = Persister.init(chain, flags)
124291
cstats: PersistStats # stats at start of chunk
125292
@@ -293,11 +460,19 @@ proc importBlocks*(conf: NimbusConf, com: CommonRef) =
293460

294461
while running and persister.stats.blocks.uint64 < conf.maxBlocks and
295462
blockNumber <= lastEra1Block:
296-
if not loadEraBlock(blockNumber):
297-
notice "No more `era1` blocks to import", blockNumber, slot
298-
break
299-
persistBlock()
300-
checkpoint()
463+
if not conf.usePortal:
464+
if not loadEraBlock(blockNumber):
465+
notice "No more `era1` blocks to import", blockNumber, slot
466+
break
467+
persistBlock()
468+
checkpoint()
469+
else:
470+
let blockSeq = await blockQueue.popFirst()
471+
for blck in blockSeq:
472+
blk = blck
473+
persistBlock()
474+
checkpoint()
475+
# debugEcho "blck:" & $blck.header.number
301476

302477
block era1Import:
303478
if blockNumber > lastEra1Block:
@@ -366,3 +541,22 @@ proc importBlocks*(conf: NimbusConf, com: CommonRef) =
366541
blocks = persister.stats.blocks,
367542
txs = persister.stats.txs,
368543
mgas = f(persister.stats.gas.float / 1000000)
544+
545+
proc importBlocksPortal*(conf: NimbusConf, com: CommonRef) {.
546+
raises: [CatchableError]
547+
.} =
548+
let
549+
portalNode = run(conf)
550+
blockQueue = newAsyncQueue[seq[EthBlock]](4)
551+
start = com.db.baseTxFrame().getSavedStateBlockNumber() + 1
552+
553+
if conf.usePortal:
554+
asyncSpawn portalNode.getBlockLoop(blockQueue, start)
555+
556+
asyncSpawn importBlocks(conf, com, portalNode, blockQueue)
557+
558+
while running:
559+
try:
560+
poll()
561+
except CatchableError as e:
562+
warn "Exception in poll()", exc = e.name, err = e.msg

0 commit comments

Comments
 (0)