Skip to content

Add the py-dht repo as a submodule #52

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 10 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,7 +1,13 @@
*.swp
*.pyc
results/*
myenv*/
*env*/
doc/_build
!results/plots.py
Frontend/
.idea

# DHT module related
DHT/imgs
DHT/csvs
DHT/.ipynb_checkpoints
3 changes: 3 additions & 0 deletions .gitmodules
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
[submodule "py-dht"]
path = py-dht
url = https://github.com/cortze/py-dht
5 changes: 5 additions & 0 deletions DAS/block.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,3 +81,8 @@ def print(self):
print(line+"|")
print(dash)


def getUniqueIDforSegment(self, rowID, columnID):
"""It returns a unique ID for a segment indicating its coordinates in the block"""
return f"r{rowID}-c{columnID}"

9 changes: 3 additions & 6 deletions DAS/simulator.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,7 @@
#!/bin/python

import networkx as nx
import logging, random
import pandas as pd
from functools import partial, partialmethod
from datetime import datetime
from DAS.tools import *
from DAS.results import *
from DAS.observer import *
Expand Down Expand Up @@ -216,12 +213,12 @@ def printDiagnostics(self):
self.logger.debug("Column %d, Neighbor %d sent: %s" % (c, val.columnNeighbors[c][nc].node.ID, val.columnNeighbors[c][nc].received), extra=self.format)
self.logger.debug("Column %d, Neighbor %d has: %s" % (c, val.columnNeighbors[c][nc].node.ID, self.validators[val.columnNeighbors[c][nc].node.ID].getColumn(c)), extra=self.format)

def run(self):
def runBlockBroadcasting(self):
"""It runs the main simulation until the block is available or it gets stucked."""
self.glob.checkRowsColumns(self.validators)
for i in range(0,self.shape.numberNodes):
if i == self.proposerID:
self.validators[i].initBlock()
self.block = self.validators[i].initBlock() # Keep the OG block that we are broadcasting
else:
self.validators[i].logIDs()
arrived, expected, ready, validatedall, validated = self.glob.checkStatus(self.validators)
Expand All @@ -235,7 +232,7 @@ def run(self):
oldMissingSamples = missingSamples
self.logger.debug("PHASE SEND %d" % steps, extra=self.format)
for i in range(0,self.shape.numberNodes):
self.validators[i].send()
self.validators[i].sendToNeigbors()
self.logger.debug("PHASE RECEIVE %d" % steps, extra=self.format)
for i in range(1,self.shape.numberNodes):
self.validators[i].receiveRowsColumns()
Expand Down
6 changes: 4 additions & 2 deletions DAS/validator.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ def logIDs(self):
self.logger.debug("Selected columns: "+str(self.columnIDs), extra=self.format)

def initBlock(self):
"""It initializes the block for the proposer."""
"""It initializes and returns the block for the proposer"""
if self.amIproposer == 0:
self.logger.warning("I am not a block proposer", extra=self.format)
else:
Expand Down Expand Up @@ -177,6 +177,8 @@ def initBlock(self):
measuredFailureRate = nbFailures * 100 / (self.shape.blockSize * self.shape.blockSize)
self.logger.debug("Number of failures: %d (%0.02f %%)", nbFailures, measuredFailureRate, extra=self.format)

return self.block

def getColumn(self, index):
"""It returns a given column."""
return self.block.getColumn(index)
Expand Down Expand Up @@ -446,7 +448,7 @@ def nextSegment():
if self.statsTxInSlot >= self.bwUplink:
return

def send(self):
def sendToNeigbors(self):
""" Send as much as we can in the timestep, limited by bwUplink."""

# process node level send queue
Expand Down
35 changes: 35 additions & 0 deletions DHT/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
# DHT simulations for DAS
Simulate the seeding and the retrieval of Ethereum DAS samples in a Kademlia-based DHT.

## Dependencies
The DHT module relies on [`py-dht`](https://github.com/cortze/py-dht) to run, however it is already installed together with the DAS block disemination dependencies.
```shell
# once the venv is created (make sure the venv name match with the `install_dependencies.sh` one)
das-research$ bash install_dependencies.sh
```

## How to run it
To run the seeding and retrieval simulation of the DHT, these are the steps that would need to be taken:
1. configure the desired parameters in the `dhtConf.py`. NOTE: the script will create the CSV and IMG folders for you!
2. execute the experiment by running:
```shell
# venv needs to be activated
# $ source venv/bin/activate
das-research/DHT$ python3 dhtStudy.py dhtSmallConf.py
```
the output should look like this for each of the possible configurations:
```shell
network init done in 52.08381795883179 secs
[===============================================================================================================================================================================================================================] 100%
test done in 159.97085118293762 secs
DHT fast-init jobs:8 done in 52.08381795883179 secs
12000 nodes, k=20, alpha=3, 10000 lookups
mean time per lookup : 0.010750784277915955
mean aggr delay (secs): 0.31828715
mean contacted nodes: 8.7223
time to make 10000 lookups: 107.50784277915955 secs

done with the studies in 167.69087147712708
```
3. all the visualization graphs can be generated using the `retrieval_on_das_plotting.ipynb` notebook.

1 change: 1 addition & 0 deletions DHT/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from plots import *
270 changes: 270 additions & 0 deletions DHT/dhtRetrievals.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,270 @@
import time
import progressbar
import random
import numpy as np
import pandas as pd
import dht
from utils import getStrFromDelayRange

TOTAL_PERCENTAGE = 100
PERCENTAGE_INTERVALS = 1


class SingleDHTretrievalStudy:

def __init__(self, csvFolder, imgFolder, jobs, nn, rn, samples,
fErrR, sErrR, cDelR, fDelR, sDelR, k, a, b, y, stepsToStop):
self.csvFolder = csvFolder
self.imgFolder = imgFolder
self.jobs = jobs
self.nn = nn
self.rn = rn
self.samples = samples
self.fastErrorRate = fErrR
self.slowErrorRate = sErrR
self.connDelayRange = cDelR
self.fastDelayRange = fDelR
self.slowDelayRange = sDelR # timeouts
self.k = k
self.alpha = a
self.beta = b
self.gamma = y
self.stepsToStop = stepsToStop
# namings
s = ""
s += f"_nn{nn}"
s += f"_rn{rn}"
s += f"_sampl{samples}"
s += f"_fer{fErrR}"
s += f"_ser{sErrR}"
s += f"_cdr{getStrFromDelayRange(cDelR)}"
s += f"_fdr{getStrFromDelayRange(fDelR)}"
s += f"_sdr{getStrFromDelayRange(sDelR)}"
s += f"_k{k}"
s += f"_a{a}"
s += f"_b{b}"
s += f"_y{y}"
s += f"_steps{stepsToStop}"
self.studyName = s
print(f"Retrieval Study => {s}")

def run(self):
# Init the DHT Network
testInitTime = time.time()
network = dht.DHTNetwork(
0,
self.fastErrorRate,
self.slowErrorRate,
self.connDelayRange,
self.fastDelayRange,
self.slowDelayRange,
self.gamma)
initStartTime = time.time()
network.init_with_random_peers(
self.jobs,
self.nn,
self.k,
self.alpha,
self.beta,
self.stepsToStop)
self.networkInitTime = time.time() - initStartTime
print(f"network init done in {self.networkInitTime} secs")

# get random node to propose publish the
builderNode = network.nodestore.get_node(random.randint(0, self.nn))

# create and publish @@@ number of samples to the network
# lookups metrics
ks = []
nns = []
stepstostops = []
fastErrorRate = []
slowErrorRate = []
connDelayRange = []
fastDelayRange = []
slowDelayRange = []
alphas = []
betas = []
gammas = []
providers = []
sampleNames = []
provideLookupAggrTime = []
provideAggrTime = []
provideOperationAggrTime = []
provideSuccNodes = []
provideFailedNodes = []
samples = []

for i in range(self.samples):
sampleContent = f"sample {i}"
summary, _ = builderNode.provide_block_segment(sampleContent)
samples.append((sampleContent, sampleContent, summary))
# add metrics for the csv
ks.append(self.k)
alphas.append(self.alpha)
betas.append(self.beta)
gammas.append(self.gamma)
nns.append(self.nn)
stepstostops.append(self.stepsToStop)
fastErrorRate.append(f"{self.fastErrorRate}")
slowErrorRate.append(f"{self.slowErrorRate}")
connDelayRange.append(f"{getStrFromDelayRange(self.connDelayRange)}")
fastDelayRange.append(f"{getStrFromDelayRange(self.fastDelayRange)}")
slowDelayRange.append(f"{getStrFromDelayRange(self.slowDelayRange)}")
providers.append(builderNode.ID)
sampleNames.append(sampleContent)
provideLookupAggrTime.append(summary['lookupDelay'])
provideAggrTime.append(summary['provideDelay'])
provideOperationAggrTime.append(summary['operationDelay'])
provideSuccNodes.append(len(summary['succesNodeIDs']))
provideFailedNodes.append(len(summary['failedNodeIDs']))

# save the provide data
df = pd.DataFrame({
"number_nodes": nns,
"k": ks,
"alpha": alphas,
"beta": betas,
"gamma": gammas,
"stop_steps": stepstostops,
"fast_error_rate": fastErrorRate,
"slow_error_rate": slowErrorRate,
"connection_delay_range": connDelayRange,
"fast_delay_range": fastDelayRange,
"slow_delay": slowDelayRange,
"provider": providers,
"sample": sampleNames,
"provide_lookup_aggr_time": provideLookupAggrTime,
"provide_aggr_time": provideAggrTime,
"provide_operation_aggr_time": provideOperationAggrTime,
"provide_succ_nodes": provideSuccNodes,
"provide_fail_nodes": provideFailedNodes,
})

df.to_csv(self.csvFolder + f"/retrieval_provide{self.studyName}.csv")
network.reset_network_metrics()
del df

nns = []
ks = []
alphas = []
betas = []
gammas = []
stepstostops = []
fastErrorRate = []
slowErrorRate = []
connDelayRange = []
fastDelayRange = []
slowDelayRange = []
retrievers = []
sampleNames = []
lookupTimes = []
lookupAggrDelays = []
attemptedNodes = []
finishedConnAttempts = []
successfullCons = []
failedCons = []
valRetrievable = []
totalDiscNodes = []
accuracies = []

bar = progressbar.ProgressBar(
maxval=self.rn,
widgets=[progressbar.Bar('=', '[', ']'), ' ', progressbar.Percentage()])
bar.start()

for i in range(self.rn):
retrieverNode = network.nodestore.get_node(random.randint(0, self.nn))
while retrieverNode.ID == builderNode.ID:
retrieverNode = network.nodestore.get_node(random.randint(0, self.nn))

for l in range(self.samples):
sampleContent = f"sample {l}"
sh = dht.Hash(sampleContent)
lstime = time.time()
closest, val, summary, aggrDelay = retrieverNode.lookup_for_hash(
key=sh, trackaccuracy=True, finishwithfirstvalue=True)
lduration = time.time() - lstime

if val == sampleContent:
valRetrievable.append(1)
else:
valRetrievable.append(0)

nns.append(self.nn)
ks.append(self.k)
alphas.append(self.alpha)
betas.append(self.beta)
gammas.append(self.gamma)
stepstostops.append(self.stepsToStop)
fastErrorRate.append(f"{self.fastErrorRate}")
slowErrorRate.append(f"{self.slowErrorRate}")
connDelayRange.append(f"{getStrFromDelayRange(self.connDelayRange)}")
fastDelayRange.append(f"{getStrFromDelayRange(self.fastDelayRange)}")
slowDelayRange.append(f"{getStrFromDelayRange(self.slowDelayRange)}")
retrievers.append(retrieverNode.ID)
sampleNames.append(sampleContent)
lookupTimes.append(lduration)
lookupAggrDelays.append(aggrDelay)
finishedConnAttempts.append(summary['connectionFinished'])
attemptedNodes.append(summary['connectionAttempts'])
successfullCons.append(summary['successfulCons'])
failedCons.append(summary['failedCons'])
totalDiscNodes.append(summary['totalNodes'])
accuracies.append(summary['accuracy'])

# clean up the memory
del sh
del summary
del closest

# percentajes
bar.update(i + 1)

bar.finish()

testDuration = time.time() - testInitTime
print(f"test done in {testDuration} secs")
print(f"DHT fast-init jobs:{self.jobs} done in {self.networkInitTime} secs")
print(f"{self.nn} nodes, k={self.k}, alpha={self.alpha}, {len(lookupTimes)} lookups")
print(f"mean time per lookup : {np.mean(lookupTimes)}")
print(f"mean aggr delay (secs): {np.mean(lookupAggrDelays) / 1000}")
print(f"mean contacted nodes: {np.mean(attemptedNodes)}")
print(f"time to make {len(lookupTimes)} lookups: {np.sum(lookupTimes)} secs")
print()

# Create the panda objs and export the to csvs
df = pd.DataFrame({
"number_nodes": nns,
"k": ks,
"alpha": alphas,
"beta": betas,
"gamma": gammas,
"stop_steps": stepstostops,
"fast_error_rate": fastErrorRate,
"slow_error_rate": slowErrorRate,
"connection_delay_range": connDelayRange,
"fast_delay_range": fastDelayRange,
"slow_delay": slowDelayRange,
"retriever": retrievers,
"sample": sampleNames,
"lookup_wallclock_time": lookupTimes,
"lookup_aggregated_delay": lookupAggrDelays,
"attempted_nodes": attemptedNodes,
"finished_connection_attempts": finishedConnAttempts,
"successful_connections": successfullCons,
"failed_connections": failedCons,
"total_discovered_nodes": totalDiscNodes,
"retrievable": valRetrievable,
"accuracy": accuracies,
})
df.to_csv(self.csvFolder + f"/retrieval_lookup{self.studyName}.csv")

# save the network metrics
networkMetrics = network.connection_metrics()
network_df = pd.DataFrame(networkMetrics)
network_df.to_csv(self.csvFolder + f"/retrieval_lookup_network{self.studyName}.csv")

del network
del df
del network_df
Loading