Skip to content

Commit 7f49899

Browse files
committed
Quick and dirty PoC for syncing from Portal history network
This PR is not intended to get merged. It is a very quick and dirty implementation with the intention of testing the Portal network and Fluffy code and to verify how long downloads of blocks take in comparison with the execution of them. It kind of abuses the current import from era code to do this. I think (?) in an improved version the block downloads should probably lead the implementation and trigger execution (it is a bit the reverse right now, which makes sense for era files). Perhaps that way the execution could even be offloaded to another thread? It is also coded without using the JSON-RPC API, as I found that easier for a quick version. But the getBlock call could be changed to use the json-rpc alternative.
1 parent f033a40 commit 7f49899

File tree

3 files changed

+225
-10
lines changed

3 files changed

+225
-10
lines changed

nimbus/config.nim

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -559,12 +559,25 @@ type
559559
defaultValue: false
560560
name: "debug-store-slot-hashes".}: bool
561561

562+
usePortal* {.
563+
hidden
564+
desc: "Use portal network instead of era files"
565+
defaultValue: false
566+
name: "debug-use-portal".}: bool
567+
568+
portalWorkers* {.
569+
hidden
570+
desc: "Amount of Portal workers to use for downloading blocks"
571+
defaultValue: 512
572+
name: "debug-portal-workers".}: int
573+
562574
of `import-rlp`:
563575
blocksFile* {.
564576
argument
565577
desc: "One or more RLP encoded block(s) files"
566578
name: "blocks-file" }: seq[InputFile]
567579

580+
568581
func parseCmdArg(T: type NetworkId, p: string): T
569582
{.gcsafe, raises: [ValueError].} =
570583
parseBiggestUInt(p).T

nimbus/nimbus_execution_client.nim

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -256,7 +256,7 @@ proc run(nimbus: NimbusNode, conf: NimbusConf) =
256256

257257
case conf.cmd
258258
of NimbusCmd.`import`:
259-
importBlocks(conf, com)
259+
importBlocksPortal(conf, com)
260260
of NimbusCmd.`import-rlp`:
261261
importRlpBlocks(conf, com)
262262
else:

nimbus/nimbus_import.nim

Lines changed: 211 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -13,15 +13,25 @@ import
1313
chronicles,
1414
metrics,
1515
chronos/timer,
16-
std/[strformat, strutils],
16+
chronos,
17+
std/[strformat, strutils, os],
1718
stew/io2,
1819
beacon_chain/era_db,
1920
beacon_chain/networking/network_metadata,
2021
./config,
2122
./common/common,
2223
./core/chain,
2324
./db/era1_db,
24-
./utils/era_helpers
25+
./utils/era_helpers,
26+
eth/common/keys, # rng
27+
eth/net/nat, # setupAddress
28+
eth/p2p/discoveryv5/protocol as discv5_protocol,
29+
eth/p2p/discoveryv5/routing_table,
30+
eth/p2p/discoveryv5/enr,
31+
../fluffy/portal_node,
32+
../fluffy/common/common_utils, # getPersistentNetKey, getPersistentEnr
33+
../fluffy/network_metadata,
34+
../fluffy/version
2535

2636
declareGauge nec_import_block_number, "Latest imported block number"
2737

@@ -87,7 +97,172 @@ template boolFlag(flags, b): PersistBlockFlags =
8797
else:
8898
{}
8999

90-
proc importBlocks*(conf: NimbusConf, com: CommonRef) =
100+
proc run(config: NimbusConf): PortalNode {.
101+
raises: [CatchableError]
102+
.} =
103+
let rng = newRng()
104+
105+
## Network configuration
106+
let
107+
bindIp = config.listenAddress
108+
udpPort = Port(config.udpPort)
109+
# TODO: allow for no TCP port mapping!
110+
(extIp, _, extUdpPort) =
111+
try:
112+
setupAddress(config.nat, config.listenAddress, udpPort, udpPort, "portal")
113+
except CatchableError as exc:
114+
raiseAssert exc.msg
115+
# raise exc # TODO: Ideally we don't have the Exception here
116+
except Exception as exc:
117+
raiseAssert exc.msg
118+
(netkey, newNetKey) =
119+
# if config.netKey.isSome():
120+
# (config.netKey.get(), true)
121+
# else:
122+
getPersistentNetKey(rng[], config.dataDir / "netkey")
123+
124+
enrFilePath = config.dataDir / "nimbus_portal_node.enr"
125+
previousEnr =
126+
if not newNetKey:
127+
getPersistentEnr(enrFilePath)
128+
else:
129+
Opt.none(enr.Record)
130+
131+
var bootstrapRecords: seq[Record]
132+
# loadBootstrapFile(string config.bootstrapNodesFile, bootstrapRecords)
133+
# bootstrapRecords.add(config.bootstrapNodes)
134+
135+
# case config.network
136+
# of PortalNetwork.none:
137+
# discard # don't connect to any network bootstrap nodes
138+
# of PortalNetwork.mainnet:
139+
# for enrURI in mainnetBootstrapNodes:
140+
# let res = enr.Record.fromURI(enrURI)
141+
# if res.isOk():
142+
# bootstrapRecords.add(res.value)
143+
# of PortalNetwork.angelfood:
144+
# for enrURI in angelfoodBootstrapNodes:
145+
# let res = enr.Record.fromURI(enrURI)
146+
# if res.isOk():
147+
# bootstrapRecords.add(res.value)
148+
149+
# Only mainnet
150+
for enrURI in mainnetBootstrapNodes:
151+
let res = enr.Record.fromURI(enrURI)
152+
if res.isOk():
153+
bootstrapRecords.add(res.value)
154+
155+
## Discovery v5 protocol setup
156+
let
157+
discoveryConfig =
158+
DiscoveryConfig.init(DefaultTableIpLimit, DefaultBucketIpLimit, DefaultBitsPerHop)
159+
d = newProtocol(
160+
netkey,
161+
extIp,
162+
Opt.none(Port),
163+
extUdpPort,
164+
# Note: The addition of default clientInfo to the ENR is a temporary
165+
# measure to easily identify & debug the clients used in the testnet.
166+
# Might make this into a, default off, cli option.
167+
localEnrFields = {"c": enrClientInfoShort},
168+
bootstrapRecords = bootstrapRecords,
169+
previousRecord = previousEnr,
170+
bindIp = bindIp,
171+
bindPort = udpPort,
172+
enrAutoUpdate = true,
173+
config = discoveryConfig,
174+
rng = rng,
175+
)
176+
177+
d.open()
178+
179+
## Portal node setup
180+
let
181+
portalProtocolConfig = PortalProtocolConfig.init(
182+
DefaultTableIpLimit, DefaultBucketIpLimit, DefaultBitsPerHop, defaultAlpha, RadiusConfig(kind: Static, logRadius: 249),
183+
defaultDisablePoke, defaultMaxGossipNodes, defaultContentCacheSize,
184+
defaultDisableContentCache, defaultMaxConcurrentOffers
185+
)
186+
187+
portalNodeConfig = PortalNodeConfig(
188+
accumulatorFile: Opt.none(string),
189+
disableStateRootValidation: true,
190+
trustedBlockRoot: Opt.none(Digest),
191+
portalConfig: portalProtocolConfig,
192+
dataDir: string config.dataDir,
193+
storageCapacity: 0,
194+
contentRequestRetries: 1
195+
)
196+
197+
node = PortalNode.new(
198+
PortalNetwork.mainnet,
199+
portalNodeConfig,
200+
d,
201+
{PortalSubnetwork.history},
202+
bootstrapRecords = bootstrapRecords,
203+
rng = rng,
204+
)
205+
206+
let enrFile = config.dataDir / "nimbus_portal_node.enr"
207+
if io2.writeFile(enrFile, d.localNode.record.toURI()).isErr:
208+
fatal "Failed to write the enr file", file = enrFile
209+
quit 1
210+
211+
## Start the Portal node.
212+
node.start()
213+
214+
node
215+
216+
proc getBlockLoop(node: PortalNode, blockQueue: AsyncQueue[seq[EthBlock]], startBlock: uint64, portalWorkers: int): Future[void] {.async.} =
217+
let historyNetwork = node.historyNetwork.value()
218+
var blockNumber = startBlock
219+
220+
let blockNumberQueue = newAsyncQueue[(uint64, uint64)](2048)
221+
var blocks: seq[EthBlock] = newSeq[EthBlock](8192)
222+
var count = 0
223+
var failureCount = 0
224+
225+
proc blockWorker(node: PortalNode): Future[void] {.async.} =
226+
while true:
227+
let (blockNumber, i) = await blockNumberQueue.popFirst()
228+
while true:
229+
let (header, body) = (await historyNetwork.getBlock(blockNumber + i)).valueOr:
230+
warn "Failed to get block", blockNumber = blockNumber + i
231+
# Note: loop will get stuck here if a block is not available
232+
failureCount.inc()
233+
continue
234+
235+
blocks[i] = init(EthBlock, header, body)
236+
count.inc()
237+
238+
break
239+
240+
var workers: seq[Future[void]] = @[]
241+
for i in 0 ..< portalWorkers:
242+
workers.add node.blockWorker()
243+
244+
while true:
245+
blocks = newSeq[EthBlock](8192)
246+
count = 0
247+
failureCount = 0
248+
info "Downloading 8192 blocks", startBlock = blockNumber
249+
let t0 = Moment.now()
250+
for i in 0..8191'u64:
251+
await blockNumberQueue.addLast((blockNumber, i))
252+
253+
# Not great :/
254+
while count != 8192:
255+
await sleepAsync(10.milliseconds)
256+
let t1 = Moment.now()
257+
let diff = (t1 - t0).nanoseconds().float / 1000000000
258+
let bps = 8192.float / diff
259+
260+
info "Finished downloading 8192 blocks", startBlock = blockNumber, bps = bps, failureCount = failureCount
261+
await blockQueue.addLast(blocks)
262+
263+
blockNumber += 8192
264+
265+
proc importBlocks*(conf: NimbusConf, com: CommonRef, node: PortalNode, blockQueue: AsyncQueue[seq[EthBlock]]) {.async.} =
91266
proc controlCHandler() {.noconv.} =
92267
when defined(windows):
93268
# workaround for https://github.com/nim-lang/Nim/issues/4057
@@ -119,7 +294,7 @@ proc importBlocks*(conf: NimbusConf, com: CommonRef) =
119294
boolFlag(NoPersistBodies, not conf.storeBodies) +
120295
boolFlag({PersistBlockFlag.NoPersistReceipts}, not conf.storeReceipts) +
121296
boolFlag({PersistBlockFlag.NoPersistSlotHashes}, not conf.storeSlotHashes)
122-
blk: Block
297+
blk: blocks.Block
123298
persister = Persister.init(chain, flags)
124299
cstats: PersistStats # stats at start of chunk
125300
@@ -293,11 +468,19 @@ proc importBlocks*(conf: NimbusConf, com: CommonRef) =
293468

294469
while running and persister.stats.blocks.uint64 < conf.maxBlocks and
295470
blockNumber <= lastEra1Block:
296-
if not loadEraBlock(blockNumber):
297-
notice "No more `era1` blocks to import", blockNumber, slot
298-
break
299-
persistBlock()
300-
checkpoint()
471+
if not conf.usePortal:
472+
if not loadEraBlock(blockNumber):
473+
notice "No more `era1` blocks to import", blockNumber, slot
474+
break
475+
persistBlock()
476+
checkpoint()
477+
else:
478+
let blockSeq = await blockQueue.popFirst()
479+
for blck in blockSeq:
480+
blk = blck
481+
persistBlock()
482+
checkpoint()
483+
# debugEcho "blck:" & $blck.header.number
301484

302485
block era1Import:
303486
if blockNumber > lastEra1Block:
@@ -366,3 +549,22 @@ proc importBlocks*(conf: NimbusConf, com: CommonRef) =
366549
blocks = persister.stats.blocks,
367550
txs = persister.stats.txs,
368551
mgas = f(persister.stats.gas.float / 1000000)
552+
553+
proc importBlocksPortal*(conf: NimbusConf, com: CommonRef) {.
554+
raises: [CatchableError]
555+
.} =
556+
let
557+
portalNode = run(conf)
558+
blockQueue = newAsyncQueue[seq[EthBlock]](4)
559+
start = com.db.baseTxFrame().getSavedStateBlockNumber() + 1
560+
561+
if conf.usePortal:
562+
asyncSpawn portalNode.getBlockLoop(blockQueue, start, conf.portalWorkers)
563+
564+
asyncSpawn importBlocks(conf, com, portalNode, blockQueue)
565+
566+
while running:
567+
try:
568+
poll()
569+
except CatchableError as e:
570+
warn "Exception in poll()", exc = e.name, err = e.msg

0 commit comments

Comments
 (0)