Skip to content

Commit 108c7f9

Browse files
author
cortze
committed
make first lookup phase for the block-builder dht sample seeding
1 parent 02c5a18 commit 108c7f9

File tree

9 files changed

+186
-38
lines changed

9 files changed

+186
-38
lines changed

.gitmodules

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,3 @@
11
[submodule "py-dht"]
22
path = py-dht
3-
url = https://github.com/cortze/py-dht.git
3+
url = git@github.comm:cortze/py-dht.git

DAS/block.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import random
44
from bitarray import bitarray
55
from bitarray.util import zeros
6+
from dht.hashes import Hash
67

78
class Block:
89
"""This class represents a block in the Ethereum blockchain."""
@@ -81,3 +82,17 @@ def print(self):
8182
print(line+"|")
8283
print(dash)
8384

85+
86+
# --- DHT Related ---
87+
def getUniqueIDforSegment(self, rowID, columnID):
88+
"""It returns a unique ID for a segment indicating its coordinates in the block"""
89+
return f"r{rowID}-c{columnID}"
90+
91+
def getSegmentHash(self, rowID, columnID):
92+
"""It generates the Hash that will be used to identify the segment in the DHT.
93+
94+
This includes matching the uniqueID based on the row and the column
95+
with the actual value of the segment.
96+
"""
97+
segmentID = self.getUniqueIDforSegment(rowID, columnID) + f"x{self.getSegment(rowID, columnID)}"
98+
return Hash(segmentID)

DAS/dht_test.py

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
import time
2+
import random
3+
from dht import DHTNetwork, Hash
4+
5+
# --- DHT parameters ---
6+
# DHT Network
7+
size = 10000
8+
jobs = 8
9+
errorRate = 0
10+
delayRange = [50, 50]
11+
12+
# DHT Client
13+
k = 20
14+
alpha = 3
15+
beta = k
16+
stepsToStop = 5
17+
18+
# Init the network
19+
n = DHTNetwork(0, errorRate, delayRange)
20+
start = time.time()
21+
nodeIDs = n.init_with_random_peers(jobs, size, k, alpha, beta, stepsToStop)
22+
print(f"Network init in {time.time() - start} secs")
23+
print()
24+
25+
# Test the DHTClient's functionality
26+
testNode = n.nodestore.get_node(random.randint(0, size))
27+
segmentToProvide = "my rollup data"
28+
29+
start = time.time()
30+
closestNodes, val, summary, aggrDelay = testNode.lookup_for_hash(Hash(segmentToProvide))
31+
print(f"Lookup done in {time.time() - start} secs")
32+
33+
start = time.time()
34+
summary, aggrDelay = testNode.provide_block_segment(segmentToProvide)
35+
print(f"Provide done in {time.time() - start} secs")
36+

DAS/shape.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
class Shape:
44
"""This class represents a set of parameters for a specific simulation."""
55

6-
def __init__(self, blockSize, numberNodes, failureModel, failureRate, class1ratio, chi, vpn1, vpn2, netDegree, bwUplinkProd, bwUplink1, bwUplink2, k, alpha, run):
6+
def __init__(self, blockSize, numberNodes, failureModel, failureRate, class1ratio, chi, vpn1, vpn2, netDegree, bwUplinkProd, bwUplink1, bwUplink2, dhtSeeding, k, alpha, run):
77
"""Initializes the shape with the parameters passed in argument."""
88
# block-segment related parameters
99
self.run = run
@@ -21,6 +21,7 @@ def __init__(self, blockSize, numberNodes, failureModel, failureRate, class1rati
2121
self.bwUplink2 = bwUplink2
2222
self.randomSeed = ""
2323
# DHT related parameters
24+
self.dhtSeeding = dhtSeeding
2425
self.k = k
2526
self.alpha = alpha
2627

@@ -39,6 +40,7 @@ def __repr__(self):
3940
shastr += "-bwup1-"+str(self.bwUplink1)
4041
shastr += "-bwup2-"+str(self.bwUplink2)
4142
shastr += "-nd-"+str(self.netDegree)
43+
shastr += "dht-seed-"+str(self.dhtSeeding)
4244
shastr += "-k-"+str(self.k)
4345
shastr += "-alpha-"+str(self.alpha)
4446
shastr += "-r-"+str(self.run)

DAS/simulator.py

Lines changed: 90 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import logging, random
55
import pandas as pd
66
from functools import partial, partialmethod
7+
from collections import deque
78
from datetime import datetime
89
from DAS.tools import *
910
from DAS.results import *
@@ -21,6 +22,7 @@ def __init__(self, shape, config, execID):
2122
self.format = {"entity": "Simulator"}
2223
self.execID = execID
2324
self.result = Result(self.shape, self.execID)
25+
self.dhtResult = Result(self.shape, self.execID)
2426
self.validators = []
2527
self.logger = []
2628
self.logLevel = config.logLevel
@@ -31,6 +33,7 @@ def __init__(self, shape, config, execID):
3133
self.distC = []
3234
self.nodeRows = []
3335
self.nodeColumns = []
36+
self.dhtNetwork = DHTNetwork(0, 0, [0])
3437

3538
# In GossipSub the initiator might push messages without participating in the mesh.
3639
# proposerPublishOnly regulates this behavior. If set to true, the proposer is not
@@ -178,23 +181,6 @@ def initNetwork(self):
178181
self.logger.debug("Val %d : rowN %s", i, self.validators[i].rowNeighbors, extra=self.format)
179182
self.logger.debug("Val %d : colN %s", i, self.validators[i].columnNeighbors, extra=self.format)
180183

181-
def initDHTNetwork(self):
182-
""" Compose the DHT network based on the pre-initialized Validators """
183-
# compose the DHT networking layer
184-
self.logger.info("Initializing DHTNetwork... with %d nodes" % self.shape.numberNodes, extra=self.format)
185-
self.DHTNetwork = DHTNetwork(self.execID, self.shape.failureRate, self.config.stepDuration)
186-
187-
# initialize each of the routing tables
188-
startTime = time.time()
189-
_ = self.DHTNetwork.init_with_random_peers(self.config.numJobs, self.shape.numberNodes,
190-
self.shape.k, self.shape.alpha, self.shape.k, self.config.nilStepsToStopLookup)
191-
self.logger.info("DHT fast-init (%d jobs) done in %.2f secs", self.config.numJobs, time.time()-startTime, extra=self.format)
192-
193-
# add the initialized DHTClient back to the Validator
194-
for val in self.validators:
195-
val.addDHTClient(self.DHTNetwork.nodestore.get_node(val.ID))
196-
# the network should be ready to go :)
197-
198184
def initLogger(self):
199185
"""It initializes the logger."""
200186
logging.TRACE = 5
@@ -239,7 +225,7 @@ def runBlockBroadcasting(self):
239225
self.glob.checkRowsColumns(self.validators)
240226
for i in range(0,self.shape.numberNodes):
241227
if i == self.proposerID:
242-
self.validators[i].initBlock()
228+
self.block = self.validators[i].initBlock() # Keep the OG block that we are broadcasting
243229
else:
244230
self.validators[i].logIDs()
245231
arrived, expected, ready, validatedall, validated = self.glob.checkStatus(self.validators)
@@ -253,7 +239,7 @@ def runBlockBroadcasting(self):
253239
oldMissingSamples = missingSamples
254240
self.logger.debug("PHASE SEND %d" % steps, extra=self.format)
255241
for i in range(0,self.shape.numberNodes):
256-
self.validators[i].send()
242+
self.validators[i].sendToNeigbors()
257243
self.logger.debug("PHASE RECEIVE %d" % steps, extra=self.format)
258244
for i in range(1,self.shape.numberNodes):
259245
self.validators[i].receiveRowsColumns()
@@ -325,6 +311,89 @@ def runBlockBroadcasting(self):
325311
self.result.populate(self.shape, self.config, missingVector)
326312
return self.result
327313

328-
def runBlockPublicationToDHT(self):
329-
"""It runs the main DHT simulation, where the block proposer has to send the segments to the XOR close enough nodes."""
314+
def initDHTNetwork(self):
315+
""" Compose the DHT network based on the pre-initialized Validators """
316+
# compose the DHT networking layer
317+
self.logger.info("Initializing DHTNetwork... with %d nodes" % self.shape.numberNodes, extra=self.format)
318+
self.dhtNetwork = DHTNetwork(self.execID, self.shape.failureRate, [self.config.stepDuration])
319+
320+
# initialize each of the routing tables
321+
startTime = time.time()
322+
_ = self.dhtNetwork.init_with_random_peers(self.config.numJobs, self.shape.numberNodes,
323+
self.shape.k, self.shape.alpha, self.shape.k, self.config.nilStepsToStopLookup)
324+
self.logger.info("DHT fast-init (%d jobs) done in %.2f secs", self.config.numJobs, time.time()-startTime, extra=self.format)
325+
326+
# add the initialized DHTClient back to the Validator
327+
for val in self.validators:
328+
val.addDHTclient(self.dhtNetwork.nodestore.get_node(val.ID))
329+
# the network should be ready to go :)
330+
331+
def runBlockPublicationToDHT(self, strategy):
332+
"""It runs the dht simulation to seed the DHT with blocks' info"""
333+
334+
if strategy == "builder-seeding-segments":
335+
self.logger.info("Seeding DHT with '%s' strategy" % strategy, extra=self.format)
336+
self.dhtBlockProposerSeedingDHTwithSegments()
337+
else:
338+
self.logger.error("unable to identify DHT seeding strategy '%s'" % strategy, extra=self.format)
339+
330340
return
341+
342+
def dhtBlockProposerSeedingDHTwithSegments(self):
343+
"""It runs the simulation where the block builder has to seed the DHT with all the block segments"""
344+
# check who is the block proposer
345+
blockProposer = self.dhtNetwork.nodestore.get_node(self.proposerID)
346+
self.logger.info("Node %d will start providing the block to the DHT!" % self.proposerID, extra=self.format)
347+
348+
# make a dht lookup for each of the segments in the block
349+
# TODO: currently sequential, add randomness later
350+
# TODO: it is pretty hard to define the bandwidth usage of so many lookups,
351+
# a concurrency degree could help though (only XX lookups at the time)
352+
totalSegements = self.shape.blockSize * self.shape.blockSize
353+
segmentIDs = deque(maxlen=totalSegements)
354+
segmentHashes = deque(maxlen=totalSegements)
355+
segmentValues = deque(maxlen=totalSegements)
356+
closestNodes = deque(maxlen=totalSegements)
357+
lookupAggrDelays = deque(maxlen=totalSegements)
358+
lookupTotalAttempts = deque(maxlen=totalSegements)
359+
lookupConnectedNodes = deque(maxlen=totalSegements)
360+
lookupProcessExecTime = deque(maxlen=totalSegements)
361+
362+
lookupStartTime = time.time()
363+
for rowID in range(self.shape.blockSize):
364+
for columnID in range(self.shape.blockSize):
365+
segmentID = self.block.getUniqueIDforSegment(rowID, columnID)
366+
segmentHash = self.block.getSegmentHash(rowID, columnID)
367+
segmentValue = self.block.getSegment(rowID, columnID)
368+
self.logger.debug(f"starting DHT lookup for segment {segmentID} with hash {segmentHash}",
369+
extra=self.format)
370+
nodes, _, summary, aggrDelay = blockProposer.lookup_for_hash(segmentHash)
371+
self.logger.debug(
372+
f"finished DHT lookup for segment {segmentID} with hash {segmentHash} in {summary['finishTime'] - summary['startTime']} secs",
373+
extra=self.format)
374+
segmentIDs.append(segmentID)
375+
segmentHashes.append(segmentHash)
376+
segmentValues.append(segmentValue)
377+
closestNodes.append(nodes)
378+
lookupAggrDelays.append(aggrDelay)
379+
lookupTotalAttempts.append(summary["connectionAttempts"])
380+
lookupConnectedNodes.append(summary["successfulCons"])
381+
lookupProcessExecTime.append(summary["finishTime"] - summary["startTime"])
382+
self.logger.info(f"lookup for the {totalSegements} segments done in {time.time() - lookupStartTime} secs",
383+
extra=self.format)
384+
385+
# make the provide operation of the segments to the closest nodes
386+
# TODO: at the moment, this only supports the standard Provide operation (mimicking IPFS' provide operation)
387+
# for each segment add the K closest nodes as neighbours
388+
389+
# start the dissemination of the segments based on avg latency windows,
390+
# track Tx and Rx stats
391+
# remember, opening a connection uses one latency step
392+
393+
# when there are no more segments to disseminate, get all the metrics
394+
# Avg successful provides vs failed ones on provide
395+
# avg time for the lookup
396+
397+
# TODO: do we want to check if the content would be retrievable?
398+
399+
return

DAS/validator.py

Lines changed: 32 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,17 @@ def __init__(self, ID, amIproposer, logger, shape, config, rows = None, columns
109109
self.segmentShuffleScheduler = True # send each segment that's worth sending once in shuffled order, then repeat
110110
self.segmentShuffleSchedulerPersist = True # Persist scheduler state between timesteps
111111

112+
# --- DHT Related ---
113+
self.segmentDHTneighbors = collections.defaultdict(dict)
114+
115+
# DHT statistics
116+
self.dhtStatsTxInSlot = 0
117+
self.dhtStatsTxPerSlot = []
118+
self.dhtStatsRxInSlot = 0
119+
self.dhtStatsRxPerSlot = []
120+
self.dhtStatsRxDupInSlot = 0
121+
self.dhtStatsRxDupPerSlot = []
122+
112123
def logIDs(self):
113124
"""It logs the assigned rows and columns."""
114125
if self.amIproposer == 1:
@@ -117,16 +128,8 @@ def logIDs(self):
117128
self.logger.debug("Selected rows: "+str(self.rowIDs), extra=self.format)
118129
self.logger.debug("Selected columns: "+str(self.columnIDs), extra=self.format)
119130

120-
def addDHTClient(self, dhtClient):
121-
self.logger.debug("Adding new DHTClient...", extra=self.format)
122-
# double check that
123-
if dhtClient.ID != self.ID:
124-
self.logger.error("Received DHTClient with different ValidatorID: %d", dhtClient.ID, extra=self.format)
125-
# TODO: do we want to panic here if the IDs don't match?
126-
self.DHTClient = dhtClient
127-
128131
def initBlock(self):
129-
"""It initializes the block for the proposer."""
132+
"""It initializes and returns the block for the proposer"""
130133
if self.amIproposer == 0:
131134
self.logger.warning("I am not a block proposer", extra=self.format)
132135
else:
@@ -185,6 +188,8 @@ def initBlock(self):
185188
measuredFailureRate = nbFailures * 100 / (self.shape.blockSize * self.shape.blockSize)
186189
self.logger.debug("Number of failures: %d (%0.02f %%)", nbFailures, measuredFailureRate, extra=self.format)
187190

191+
return self.block
192+
188193
def getColumn(self, index):
189194
"""It returns a given column."""
190195
return self.block.getColumn(index)
@@ -454,7 +459,7 @@ def nextSegment():
454459
if self.statsTxInSlot >= self.bwUplink:
455460
return
456461

457-
def send(self):
462+
def sendToNeigbors(self):
458463
""" Send as much as we can in the timestep, limited by bwUplink."""
459464

460465
# process node level send queue
@@ -552,3 +557,20 @@ def checkStatus(columnIDs, rowIDs):
552557
validated+=1
553558

554559
return arrived, expected, validated
560+
561+
# --- DHT Related ---
562+
563+
def addDHTclient(self, dhtClient):
564+
"""Add a DHTClient with its respective routing table as part of the Validator"""
565+
self.logger.debug("Adding new DHTClient...", extra=self.format)
566+
# double check that
567+
if dhtClient.ID != self.ID:
568+
self.logger.error("Received DHTClient with different ValidatorID: %d", dhtClient.ID, extra=self.format)
569+
# TODO: do we want to panic here if the IDs don't match?
570+
self.dhtClient = dhtClient
571+
572+
def setDHTtargetForSegment(self):
573+
pass
574+
def sendDHTsegments(self):
575+
"""DHT equivalent to """
576+
pass

py-dht

Submodule py-dht updated from ccf8d14 to f6aefd1

smallConf.py

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,10 @@
106106
# True to simulate the distribution of the BlockSegments over a simulated DHTNetwork
107107
dhtSimulation = True
108108

109+
# Define the strategy used to seed or disseminate the Block to the DHT
110+
# "builder-seeding-segments" -> The block builder is in charge of seeding the DHT with the block samples
111+
dhtSeedings = ["builder-seeding-segments"]
112+
109113
# K replication factor, in how many DHT nodes are we going to store the block segments
110114
# reused as how many DHT nodes will fit into each Kbucket to the routing table
111115
ks = [20]
@@ -118,9 +122,9 @@
118122
nilStepsToStopLookup = 3
119123

120124
def nextShape():
121-
for run, fm, fr, class1ratio, chi, vpn1, vpn2, blockSize, nn, netDegree, bwUplinkProd, bwUplink1, bwUplink2, k, alpha in itertools.product(
122-
runs, failureModels, failureRates, class1ratios, chis, validatorsPerNode1, validatorsPerNode2, blockSizes, numberNodes, netDegrees, bwUplinksProd, bwUplinks1, bwUplinks2, ks, alphas):
125+
for run, fm, fr, class1ratio, chi, vpn1, vpn2, blockSize, nn, netDegree, bwUplinkProd, bwUplink1, bwUplink2, dhtSeeding, k, alpha in itertools.product(
126+
runs, failureModels, failureRates, class1ratios, chis, validatorsPerNode1, validatorsPerNode2, blockSizes, numberNodes, netDegrees, bwUplinksProd, bwUplinks1, bwUplinks2, dhtSeedings, ks, alphas):
123127
# Network Degree has to be an even number
124128
if netDegree % 2 == 0:
125-
shape = Shape(blockSize, nn, fm, fr, class1ratio, chi, vpn1, vpn2, netDegree, bwUplinkProd, bwUplink1, bwUplink2, k, alpha, run)
129+
shape = Shape(blockSize, nn, fm, fr, class1ratio, chi, vpn1, vpn2, netDegree, bwUplinkProd, bwUplink1, bwUplink2, dhtSeeding, k, alpha, run)
126130
yield shape

study.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ def runOnce(config, shape, execID):
3838
if config.dhtSimulation:
3939
sim.logger.info("Shape: %s ... Setting up DHT Network" % (str(sim.shape.__dict__)), extra=sim.format)
4040
sim.initDHTNetwork()
41-
sim.runBlockPublicationToDHT()
41+
sim.runBlockPublicationToDHT(shape.dhtSeeding)
4242
sim.logger.info("Shape: %s ... Finished up Block propagation on the DHT Network" % (str(sim.shape.__dict__)), extra=sim.format)
4343
# TODO: append the DHT results to the previous results
4444

0 commit comments

Comments
 (0)