diff --git a/.gitignore b/.gitignore index 43965e8..06cd68e 100644 --- a/.gitignore +++ b/.gitignore @@ -1,7 +1,13 @@ *.swp *.pyc results/* -myenv*/ +*env*/ doc/_build !results/plots.py Frontend/ +.idea + +# DHT module related +DHT/imgs +DHT/csvs +DHT/.ipynb_checkpoints diff --git a/.gitmodules b/.gitmodules new file mode 100644 index 0000000..879cad6 --- /dev/null +++ b/.gitmodules @@ -0,0 +1,3 @@ +[submodule "py-dht"] + path = py-dht + url = https://github.com/cortze/py-dht diff --git a/DAS/block.py b/DAS/block.py index f76a944..06f8aef 100644 --- a/DAS/block.py +++ b/DAS/block.py @@ -81,3 +81,8 @@ def print(self): print(line+"|") print(dash) + + def getUniqueIDforSegment(self, rowID, columnID): + """It returns a unique ID for a segment indicating its coordinates in the block""" + return f"r{rowID}-c{columnID}" + diff --git a/DAS/simulator.py b/DAS/simulator.py index 174a9b2..156678a 100644 --- a/DAS/simulator.py +++ b/DAS/simulator.py @@ -1,10 +1,7 @@ #!/bin/python - import networkx as nx -import logging, random import pandas as pd from functools import partial, partialmethod -from datetime import datetime from DAS.tools import * from DAS.results import * from DAS.observer import * @@ -216,12 +213,12 @@ def printDiagnostics(self): self.logger.debug("Column %d, Neighbor %d sent: %s" % (c, val.columnNeighbors[c][nc].node.ID, val.columnNeighbors[c][nc].received), extra=self.format) self.logger.debug("Column %d, Neighbor %d has: %s" % (c, val.columnNeighbors[c][nc].node.ID, self.validators[val.columnNeighbors[c][nc].node.ID].getColumn(c)), extra=self.format) - def run(self): + def runBlockBroadcasting(self): """It runs the main simulation until the block is available or it gets stucked.""" self.glob.checkRowsColumns(self.validators) for i in range(0,self.shape.numberNodes): if i == self.proposerID: - self.validators[i].initBlock() + self.block = self.validators[i].initBlock() # Keep the OG block that we are broadcasting else: self.validators[i].logIDs() arrived, expected, ready, validatedall, validated = self.glob.checkStatus(self.validators) @@ -235,7 +232,7 @@ def run(self): oldMissingSamples = missingSamples self.logger.debug("PHASE SEND %d" % steps, extra=self.format) for i in range(0,self.shape.numberNodes): - self.validators[i].send() + self.validators[i].sendToNeigbors() self.logger.debug("PHASE RECEIVE %d" % steps, extra=self.format) for i in range(1,self.shape.numberNodes): self.validators[i].receiveRowsColumns() diff --git a/DAS/validator.py b/DAS/validator.py index 4e8d350..7602564 100644 --- a/DAS/validator.py +++ b/DAS/validator.py @@ -118,7 +118,7 @@ def logIDs(self): self.logger.debug("Selected columns: "+str(self.columnIDs), extra=self.format) def initBlock(self): - """It initializes the block for the proposer.""" + """It initializes and returns the block for the proposer""" if self.amIproposer == 0: self.logger.warning("I am not a block proposer", extra=self.format) else: @@ -177,6 +177,8 @@ def initBlock(self): measuredFailureRate = nbFailures * 100 / (self.shape.blockSize * self.shape.blockSize) self.logger.debug("Number of failures: %d (%0.02f %%)", nbFailures, measuredFailureRate, extra=self.format) + return self.block + def getColumn(self, index): """It returns a given column.""" return self.block.getColumn(index) @@ -446,7 +448,7 @@ def nextSegment(): if self.statsTxInSlot >= self.bwUplink: return - def send(self): + def sendToNeigbors(self): """ Send as much as we can in the timestep, limited by bwUplink.""" # process node level send queue diff --git a/DHT/README.md b/DHT/README.md new file mode 100644 index 0000000..8de2693 --- /dev/null +++ b/DHT/README.md @@ -0,0 +1,35 @@ +# DHT simulations for DAS +Simulate the seeding and the retrieval of Ethereum DAS samples in a Kademlia-based DHT. + +## Dependencies +The DHT module relies on [`py-dht`](https://github.com/cortze/py-dht) to run, however it is already installed together with the DAS block disemination dependencies. +```shell +# once the venv is created (make sure the venv name match with the `install_dependencies.sh` one) +das-research$ bash install_dependencies.sh +``` + +## How to run it +To run the seeding and retrieval simulation of the DHT, these are the steps that would need to be taken: +1. configure the desired parameters in the `dhtConf.py`. NOTE: the script will create the CSV and IMG folders for you! +2. execute the experiment by running: +```shell +# venv needs to be activated +# $ source venv/bin/activate +das-research/DHT$ python3 dhtStudy.py dhtSmallConf.py +``` +the output should look like this for each of the possible configurations: +```shell +network init done in 52.08381795883179 secs +[===============================================================================================================================================================================================================================] 100% +test done in 159.97085118293762 secs +DHT fast-init jobs:8 done in 52.08381795883179 secs +12000 nodes, k=20, alpha=3, 10000 lookups +mean time per lookup : 0.010750784277915955 +mean aggr delay (secs): 0.31828715 +mean contacted nodes: 8.7223 +time to make 10000 lookups: 107.50784277915955 secs + +done with the studies in 167.69087147712708 +``` +3. all the visualization graphs can be generated using the `retrieval_on_das_plotting.ipynb` notebook. + diff --git a/DHT/__init__.py b/DHT/__init__.py new file mode 100644 index 0000000..2fe8066 --- /dev/null +++ b/DHT/__init__.py @@ -0,0 +1 @@ +from plots import * \ No newline at end of file diff --git a/DHT/dhtRetrievals.py b/DHT/dhtRetrievals.py new file mode 100644 index 0000000..472e5e3 --- /dev/null +++ b/DHT/dhtRetrievals.py @@ -0,0 +1,270 @@ +import time +import progressbar +import random +import numpy as np +import pandas as pd +import dht +from utils import getStrFromDelayRange + +TOTAL_PERCENTAGE = 100 +PERCENTAGE_INTERVALS = 1 + + +class SingleDHTretrievalStudy: + + def __init__(self, csvFolder, imgFolder, jobs, nn, rn, samples, + fErrR, sErrR, cDelR, fDelR, sDelR, k, a, b, y, stepsToStop): + self.csvFolder = csvFolder + self.imgFolder = imgFolder + self.jobs = jobs + self.nn = nn + self.rn = rn + self.samples = samples + self.fastErrorRate = fErrR + self.slowErrorRate = sErrR + self.connDelayRange = cDelR + self.fastDelayRange = fDelR + self.slowDelayRange = sDelR # timeouts + self.k = k + self.alpha = a + self.beta = b + self.gamma = y + self.stepsToStop = stepsToStop + # namings + s = "" + s += f"_nn{nn}" + s += f"_rn{rn}" + s += f"_sampl{samples}" + s += f"_fer{fErrR}" + s += f"_ser{sErrR}" + s += f"_cdr{getStrFromDelayRange(cDelR)}" + s += f"_fdr{getStrFromDelayRange(fDelR)}" + s += f"_sdr{getStrFromDelayRange(sDelR)}" + s += f"_k{k}" + s += f"_a{a}" + s += f"_b{b}" + s += f"_y{y}" + s += f"_steps{stepsToStop}" + self.studyName = s + print(f"Retrieval Study => {s}") + + def run(self): + # Init the DHT Network + testInitTime = time.time() + network = dht.DHTNetwork( + 0, + self.fastErrorRate, + self.slowErrorRate, + self.connDelayRange, + self.fastDelayRange, + self.slowDelayRange, + self.gamma) + initStartTime = time.time() + network.init_with_random_peers( + self.jobs, + self.nn, + self.k, + self.alpha, + self.beta, + self.stepsToStop) + self.networkInitTime = time.time() - initStartTime + print(f"network init done in {self.networkInitTime} secs") + + # get random node to propose publish the + builderNode = network.nodestore.get_node(random.randint(0, self.nn)) + + # create and publish @@@ number of samples to the network + # lookups metrics + ks = [] + nns = [] + stepstostops = [] + fastErrorRate = [] + slowErrorRate = [] + connDelayRange = [] + fastDelayRange = [] + slowDelayRange = [] + alphas = [] + betas = [] + gammas = [] + providers = [] + sampleNames = [] + provideLookupAggrTime = [] + provideAggrTime = [] + provideOperationAggrTime = [] + provideSuccNodes = [] + provideFailedNodes = [] + samples = [] + + for i in range(self.samples): + sampleContent = f"sample {i}" + summary, _ = builderNode.provide_block_segment(sampleContent) + samples.append((sampleContent, sampleContent, summary)) + # add metrics for the csv + ks.append(self.k) + alphas.append(self.alpha) + betas.append(self.beta) + gammas.append(self.gamma) + nns.append(self.nn) + stepstostops.append(self.stepsToStop) + fastErrorRate.append(f"{self.fastErrorRate}") + slowErrorRate.append(f"{self.slowErrorRate}") + connDelayRange.append(f"{getStrFromDelayRange(self.connDelayRange)}") + fastDelayRange.append(f"{getStrFromDelayRange(self.fastDelayRange)}") + slowDelayRange.append(f"{getStrFromDelayRange(self.slowDelayRange)}") + providers.append(builderNode.ID) + sampleNames.append(sampleContent) + provideLookupAggrTime.append(summary['lookupDelay']) + provideAggrTime.append(summary['provideDelay']) + provideOperationAggrTime.append(summary['operationDelay']) + provideSuccNodes.append(len(summary['succesNodeIDs'])) + provideFailedNodes.append(len(summary['failedNodeIDs'])) + + # save the provide data + df = pd.DataFrame({ + "number_nodes": nns, + "k": ks, + "alpha": alphas, + "beta": betas, + "gamma": gammas, + "stop_steps": stepstostops, + "fast_error_rate": fastErrorRate, + "slow_error_rate": slowErrorRate, + "connection_delay_range": connDelayRange, + "fast_delay_range": fastDelayRange, + "slow_delay": slowDelayRange, + "provider": providers, + "sample": sampleNames, + "provide_lookup_aggr_time": provideLookupAggrTime, + "provide_aggr_time": provideAggrTime, + "provide_operation_aggr_time": provideOperationAggrTime, + "provide_succ_nodes": provideSuccNodes, + "provide_fail_nodes": provideFailedNodes, + }) + + df.to_csv(self.csvFolder + f"/retrieval_provide{self.studyName}.csv") + network.reset_network_metrics() + del df + + nns = [] + ks = [] + alphas = [] + betas = [] + gammas = [] + stepstostops = [] + fastErrorRate = [] + slowErrorRate = [] + connDelayRange = [] + fastDelayRange = [] + slowDelayRange = [] + retrievers = [] + sampleNames = [] + lookupTimes = [] + lookupAggrDelays = [] + attemptedNodes = [] + finishedConnAttempts = [] + successfullCons = [] + failedCons = [] + valRetrievable = [] + totalDiscNodes = [] + accuracies = [] + + bar = progressbar.ProgressBar( + maxval=self.rn, + widgets=[progressbar.Bar('=', '[', ']'), ' ', progressbar.Percentage()]) + bar.start() + + for i in range(self.rn): + retrieverNode = network.nodestore.get_node(random.randint(0, self.nn)) + while retrieverNode.ID == builderNode.ID: + retrieverNode = network.nodestore.get_node(random.randint(0, self.nn)) + + for l in range(self.samples): + sampleContent = f"sample {l}" + sh = dht.Hash(sampleContent) + lstime = time.time() + closest, val, summary, aggrDelay = retrieverNode.lookup_for_hash( + key=sh, trackaccuracy=True, finishwithfirstvalue=True) + lduration = time.time() - lstime + + if val == sampleContent: + valRetrievable.append(1) + else: + valRetrievable.append(0) + + nns.append(self.nn) + ks.append(self.k) + alphas.append(self.alpha) + betas.append(self.beta) + gammas.append(self.gamma) + stepstostops.append(self.stepsToStop) + fastErrorRate.append(f"{self.fastErrorRate}") + slowErrorRate.append(f"{self.slowErrorRate}") + connDelayRange.append(f"{getStrFromDelayRange(self.connDelayRange)}") + fastDelayRange.append(f"{getStrFromDelayRange(self.fastDelayRange)}") + slowDelayRange.append(f"{getStrFromDelayRange(self.slowDelayRange)}") + retrievers.append(retrieverNode.ID) + sampleNames.append(sampleContent) + lookupTimes.append(lduration) + lookupAggrDelays.append(aggrDelay) + finishedConnAttempts.append(summary['connectionFinished']) + attemptedNodes.append(summary['connectionAttempts']) + successfullCons.append(summary['successfulCons']) + failedCons.append(summary['failedCons']) + totalDiscNodes.append(summary['totalNodes']) + accuracies.append(summary['accuracy']) + + # clean up the memory + del sh + del summary + del closest + + # percentajes + bar.update(i + 1) + + bar.finish() + + testDuration = time.time() - testInitTime + print(f"test done in {testDuration} secs") + print(f"DHT fast-init jobs:{self.jobs} done in {self.networkInitTime} secs") + print(f"{self.nn} nodes, k={self.k}, alpha={self.alpha}, {len(lookupTimes)} lookups") + print(f"mean time per lookup : {np.mean(lookupTimes)}") + print(f"mean aggr delay (secs): {np.mean(lookupAggrDelays) / 1000}") + print(f"mean contacted nodes: {np.mean(attemptedNodes)}") + print(f"time to make {len(lookupTimes)} lookups: {np.sum(lookupTimes)} secs") + print() + + # Create the panda objs and export the to csvs + df = pd.DataFrame({ + "number_nodes": nns, + "k": ks, + "alpha": alphas, + "beta": betas, + "gamma": gammas, + "stop_steps": stepstostops, + "fast_error_rate": fastErrorRate, + "slow_error_rate": slowErrorRate, + "connection_delay_range": connDelayRange, + "fast_delay_range": fastDelayRange, + "slow_delay": slowDelayRange, + "retriever": retrievers, + "sample": sampleNames, + "lookup_wallclock_time": lookupTimes, + "lookup_aggregated_delay": lookupAggrDelays, + "attempted_nodes": attemptedNodes, + "finished_connection_attempts": finishedConnAttempts, + "successful_connections": successfullCons, + "failed_connections": failedCons, + "total_discovered_nodes": totalDiscNodes, + "retrievable": valRetrievable, + "accuracy": accuracies, + }) + df.to_csv(self.csvFolder + f"/retrieval_lookup{self.studyName}.csv") + + # save the network metrics + networkMetrics = network.connection_metrics() + network_df = pd.DataFrame(networkMetrics) + network_df.to_csv(self.csvFolder + f"/retrieval_lookup_network{self.studyName}.csv") + + del network + del df + del network_df diff --git a/DHT/dhtSmallConf.py b/DHT/dhtSmallConf.py new file mode 100644 index 0000000..35a797d --- /dev/null +++ b/DHT/dhtSmallConf.py @@ -0,0 +1,25 @@ +# Output Folders +csvsFolder = "csvs/retrieval_test" +imgFolder = "imgs/retrieval_test" + +# Simulation +# Define the type of study that we want to perform: "retrieval" +studyType = "retrieval" + +# Network +jobs = 8 +nodeNumber = [2_000] +nodesRetrieving = [100] +samples = [20] +fastErrorRate = [10] +slowErrorRate = [0] +connectionDelayRange = [range(50, 76, 1)] # ms +fastDelayRange = [range(50, 101, 1)] # ms +slowDelays = [None] # ms +gammas = [0.125] # ms + +# DHT config +ks = [20] +alphas = [3] +betas = [20] +stepsToStops = [3] diff --git a/DHT/dhtStudy.py b/DHT/dhtStudy.py new file mode 100644 index 0000000..9ffbe3f --- /dev/null +++ b/DHT/dhtStudy.py @@ -0,0 +1,77 @@ +import gc +import os +import sys +import time +import importlib +import itertools +from dhtRetrievals import SingleDHTretrievalStudy + + +def study(config): + studyStartTime = time.time() + + for nn, nr, samples, fastErrR, slowErrR, connDelayR, fastDelayR, slowD, k, a, b, y, steps4stop in itertools.product( + config.nodeNumber, + config.nodesRetrieving, + config.samples, + config.fastErrorRate, + config.slowErrorRate, + config.connectionDelayRange, + config.fastDelayRange, + config.slowDelays, + config.ks, + config.alphas, + config.betas, + config.gammas, + config.stepsToStops): + + if config.studyType == "retrieval": + singleStudy = SingleDHTretrievalStudy( + config.csvsFolder, + config.imgFolder, + config.jobs, + nn, + nr, + samples, + fastErrR, + slowErrR, + connDelayR, + fastDelayR, + slowD, + k, + a, + b, + y, + steps4stop) + else: + print(f"study type not recognized: {config.studyType}") + exit(1) + + # if the study type is correct, run the simulation + singleStudy.run() + + # clean up memory + del singleStudy + _ = gc.collect() + + print(f"done with the studies in {time.time() - studyStartTime}") + + +if __name__ == "__main__": + if len(sys.argv) < 2: + print("please provide a configuration file") + + try: + config = importlib.import_module(sys.argv[1]) + except ModuleNotFoundError as e: + try: + config = importlib.import_module(str(sys.argv[1]).replace(".py", "")) + except ModuleNotFoundError as e: + print(e) + print("You need to pass a configuration file in parameter") + exit(1) + + # Make sure that the output folders exist + for folder in [config.csvsFolder, config.imgFolder]: + os.makedirs(folder, exist_ok=True) + study(config) diff --git a/DHT/plots.py b/DHT/plots.py new file mode 100644 index 0000000..6a24a3b --- /dev/null +++ b/DHT/plots.py @@ -0,0 +1,265 @@ +import os +import numpy as np +import pandas as pd +import seaborn as sns +import matplotlib.pyplot as plt +from IPython.display import display + +# Tag Identifiers +RETRIEVAL = "retrieval" +LOOKUP = "lookup" +NETWORK = "network" +NN = "nn" +RN = "rn" +SAMPL = "sampl" +FER = "fer" +SER = "ser" +CDR = "cdr" +FDR = "fdr" +SDR = "sdr" +K = "k" +A = "a" +B = "b" +Y = "y" +STEPS = "steps" +# -- +OPERATION = "operation" +NUMBER_NODES = "number_nodes" +RETRIEVAL_NODES = "retrieval_nodes" +CONCURRENT_SAMPLES = "concurrent_samples" +FAST_ERROR_RATE = "fast_error_rate" +SLOW_ERROR_RATE = "slow_error_rate" +CONNECTION_DELAYS = "connection_delays" +FAST_ERROR_DELAYS = "fast_error_delays" +SLOW_ERROR_DELAYS = "slow_error_delays" +K_PARAMETER = "k_replication" +ALPHA = "alpha" +BETA = "beta" +GAMMA = "overhead" +STEPS_TO_STOP = "steps_to_stop" + + +# Utils +tag_example = "retrieval_lookup_nn12000_rn1_sampl100_fer10_ser0_cdr50-75_fdr50-100_sdr0_k20_a3_b20_y1.0_steps3" +def tag_parser(tag: str): + params = { + OPERATION: "", + NUMBER_NODES: "", + RETRIEVAL_NODES: "", + CONCURRENT_SAMPLES: "", + FAST_ERROR_RATE: "", + SLOW_ERROR_RATE: "", + CONNECTION_DELAYS: "", + FAST_ERROR_DELAYS: "", + SLOW_ERROR_DELAYS: "", + K_PARAMETER: "", + ALPHA: "", + BETA: "", + GAMMA: "", + STEPS_TO_STOP: "", + } + # split the tag into - type & parameters + raw_params = tag.split("_") + for param in raw_params: + if NN in param: + params[NUMBER_NODES] = param.replace(NN, "") + elif RN in param: + params[RETRIEVAL_NODES] = param.replace(RN, "") + elif SAMPL in param: + params[CONCURRENT_SAMPLES] = param.replace(SAMPL, "") + elif FER in param: + params[FAST_ERROR_RATE] = param.replace(FER, "") + elif SER in param: + params[SLOW_ERROR_RATE] = param.replace(SER, "") + elif CDR in param: + params[CONNECTION_DELAYS] = param.replace(CDR, "") + elif FDR in param: + params[FAST_ERROR_DELAYS] = param.replace(FDR, "") + elif SDR in param: + params[SLOW_ERROR_DELAYS] = param.replace(SDR, "") + elif K in param and param != "lookup": + params[K_PARAMETER] = param.replace(K, "") + elif A in param: + params[ALPHA] = param.replace(A, "") + elif B in param: + params[BETA] = param.replace(B, "") + elif Y in param: + params[GAMMA] = param.replace(Y, "") + elif STEPS in param: + params[STEPS_TO_STOP] = param.replace(STEPS, "") + else: + if params[OPERATION] == "": + params[OPERATION] = param + else: + params[OPERATION] += f"_{param}" + return params + +def compose_legend(params, labels): + legend = "" + for label in labels: + if legend == "": + legend = f"{label}={params[label]}" + else: + legend += f" {label}={params[label]}" + return legend + +def make_folder(folder, reason): + try: + os.mkdir(folder) + print(f"created folder {folder} for {reason}") + except FileExistsError: + print(f"folder {folder} was already created") + except Exception as e: + print(e) + + +# --- Single Metrics --- +class SingleMetrics: + + metrics = { + "lookup_aggregated_delay": { + "title_tag": "delay", + "xlabel_tag": "delay (ms)", + "ylabel_tag": "", + }, + "finished_connection_attempts": { + "title_tag": "hops", + "xlabel_tag": "hops", + "ylabel_tag": "", + }, + "accuracy": { + "title_tag": "accuracy", + "xlabel_tag": "accuracy", + "ylabel_tag": "", + }, + } + + def __init__(self, file, output_image_folder, operation, metrics: dict = dict()): + self.file = file + self.df = pd.read_csv(file) + self.label = file.split("/")[-1].replace(".csv", "") + self.targetFolder = output_image_folder+"/"+self.label + self.operation = operation + # add metrics to pre-existing ones + self.metrics.update(metrics) + # Make sure there is a valid folder for the imgaes + make_folder(self.targetFolder, f"for keeping the lookup related images about {self.label}\n") + print(f"plotting {self.label}, saving figures at {self.targetFolder}\n") + # display the lookup wallclock cdf + + # display the aggregated delay cdf + for metric_name, metric_opts in self.metrics.items(): + self.plot_cdf(metric_name, metric_opts) + self.plot_pdf(metric_name, metric_opts) + + def plot_cdf(self, column_name, column_opts): + df = self.df.sort_values(column_name) + # CDF + sns.set() + g = sns.lineplot(data=df, x=column_name, y=np.linspace(0, 1, len(df)), color='red', ci=None) + g.set(title=f"Simulated {self.operation} {column_name} CDF ({self.label})", + xlabel=f"Simulated {column_opts['xlabel_tag']}", ylabel=f"{self.operation} {column_opts['ylabel_tag']}") + fig = g.get_figure() + fig.savefig(self.targetFolder+f"/{self.operation.lower()}_{column_name}_cdf.png") + plt.show() + + def plot_pdf(self, column_name, column_opts): + df = self.df.sort_values(column_name) + # Histogram + bins = 8 + sns.set() + g = sns.histplot(x=df[column_name], bins=bins) + g.set(title=f"Simulated lookup {column_name} PDF ({self.label})", + xlabel=f"Simulated {column_opts['xlabel_tag']}", ylabel=f"Lookups {column_opts['ylabel_tag']}") + fig = g.get_figure() + fig.savefig(self.targetFolder + f"/lookup_{column_name}_pdf.png") + plt.show() + + +# --- Multiple Aggregators --- +class CombinedMetrics: + metrics = { + "lookup_aggregated_delay": { + "title_tag": "delay", + "xlabel_tag": "delay (ms)", + "ylabel_tag": "", + }, + "finished_connection_attempts": { + "title_tag": "hops", + "xlabel_tag": "hops", + "ylabel_tag": "", + }, + "accuracy": { + "title_tag": "accuracy", + "xlabel_tag": "accuracy", + "ylabel_tag": "", + }, + } + + def __init__(self, files, aggregator, filters, operation, output_image_folder, metrics, legend): + self.files = files + self.dfs = [] + self.tags = [] + self.params = [] + self.tag = aggregator + self.filters = filters + self.operation = operation + # add metrics to pre-existing ones + self.metrics.update(metrics) + for file in files: + if any(filter not in file for filter in filters): + continue + + self.dfs.append(pd.read_csv(file)) + raw_tag = file.split("/")[-1].replace(".csv", "") + params = tag_parser(raw_tag) + tag = compose_legend(params, legend) + self.params.append(params) + self.tags.append(tag) + + self.udf = self.unify_dfs(self.dfs) # unified dataframe + + self.targetFolder = output_image_folder+f"/{self.operation.lower}_comparison_{aggregator}" + make_folder(self.targetFolder, f"for keeping the {self.operation} related images about {self.tag}\n") + print(f"plotting by {aggregator}, saving figures at {self.targetFolder}\n") + + # --- plotting sequence --- + for metrics_name, metrics_opts in self.metrics.items(): + self.plot_cdfs_by(aggregator, metrics_name, metrics_opts) + self.plot_pdfs_by(aggregator, metrics_name, metrics_opts) + + def unify_dfs(self, dfs): + return pd.concat(dfs) + + def plot_cdfs_by(self, aggregator_tag, column_name, column_opts): + # CDF + sns.set() + palette = sns.color_palette(n_colors=len(self.dfs)) + for i, df in enumerate(self.dfs): + df = df.sort_values(column_name) + g = sns.lineplot(data=df, x=column_name, y=np.linspace(0, 1, len(df)), label=self.tags[i], + ci=None, color=palette[i]) + g.set(title=f"Simulated {self.operation} {column_opts['title_tag']} CDF (by {aggregator_tag})", + xlabel=f"Simulated {column_opts['xlabel_tag']}", + ylabel=f"{self.operation} {column_opts['ylabel_tag']} CDF") + plt.legend(loc='lower center', ncols=1, bbox_to_anchor=(0.5, -0.2+(-0.065*len(self.dfs)))) + fig = g.get_figure() + fig.savefig(self.targetFolder+f"/simulated_{self.operation.lower()}_{column_name}_cdf.png") + plt.show() + + def plot_pdfs_by(self, aggregator_tag, column_name, column_opts): + # Histogram + sns.set() + by_aggregator = self.udf.groupby([column_name, aggregator_tag]).count() + df = by_aggregator.reset_index() + g = sns.histplot(data=df, x=df[column_name]) + """ + g = sns.barplot(data=df, x=df[column_name], y="Unnamed: 0", hue=aggregator_tag, width=1.2) + """ + g.set(title=f"Simulated {self.operation} {column_opts['title_tag']} PDF (by {aggregator_tag})", + xlabel=f"Simulated {column_opts['xlabel_tag']}", + ylabel=f"{self.operation} {column_opts['ylabel_tag']}") + plt.legend(loc='lower center', ncols=1, bbox_to_anchor=(0.5, -0.2+(-0.065*len(self.dfs)))) + fig = g.get_figure() + fig.savefig(self.targetFolder+f"/simulated_{self.operation.lower()}_{column_name}_hist.png") + plt.show() diff --git a/DHT/requirements.txt b/DHT/requirements.txt new file mode 100644 index 0000000..681969d --- /dev/null +++ b/DHT/requirements.txt @@ -0,0 +1,116 @@ +anyio==4.1.0 +argon2-cffi==23.1.0 +argon2-cffi-bindings==21.2.0 +arrow==1.3.0 +asttokens==2.4.1 +async-lru==2.0.4 +attrs==23.1.0 +Babel==2.13.1 +beautifulsoup4==4.12.2 +bitarray==2.8.0 +bleach==6.1.0 +certifi==2023.11.17 +cffi==1.16.0 +charset-normalizer==3.3.2 +comm==0.2.0 +contourpy==1.2.0 +cycler==0.12.1 +debugpy==1.8.0 +decorator==5.1.1 +defusedxml==0.7.1 +dicttoxml==1.7.16 +exceptiongroup==1.2.0 +executing==2.0.1 +fastjsonschema==2.19.0 +fonttools==4.45.1 +fqdn==1.5.1 +idna==3.6 +ipykernel==6.27.1 +ipython==8.18.1 +ipywidgets==8.1.1 +isoduration==20.11.0 +jedi==0.19.1 +Jinja2==3.1.2 +joblib==1.2.0 +json5==0.9.14 +jsonpointer==2.4 +jsonschema==4.20.0 +jsonschema-specifications==2023.11.1 +jupyter==1.0.0 +jupyter-console==6.6.3 +jupyter-events==0.9.0 +jupyter-lsp==2.2.1 +jupyter_client==8.6.0 +jupyter_core==5.5.0 +jupyter_server==2.11.1 +jupyter_server_terminals==0.4.4 +jupyterlab==4.0.9 +jupyterlab-widgets==3.0.9 +jupyterlab_pygments==0.3.0 +jupyterlab_server==2.25.2 +kiwisolver==1.4.5 +MarkupSafe==2.1.3 +matplotlib==3.8.2 +matplotlib-inline==0.1.6 +mistune==3.0.2 +mplfinance==0.12.9b7 +nbclient==0.9.0 +nbconvert==7.11.0 +nbformat==5.9.2 +nest-asyncio==1.5.8 +networkx==3.2.1 +notebook==7.0.6 +notebook_shim==0.2.3 +numpy==1.26.2 +overrides==7.4.0 +packaging==23.2 +pandas==2.1.3 +pandocfilters==1.5.0 +parso==0.8.3 +pexpect==4.9.0 +Pillow==10.1.0 +platformdirs==4.0.0 +plotly==5.18.0 +progressbar==2.5 +prometheus-client==0.19.0 +prompt-toolkit==3.0.41 +psutil==5.9.6 +ptyprocess==0.7.0 +pure-eval==0.2.2 +pycparser==2.21 +Pygments==2.17.2 +pyparsing==3.1.1 +python-dateutil==2.8.2 +python-json-logger==2.0.7 +pytz==2023.3.post1 +PyYAML==6.0.1 +pyzmq==25.1.1 +qtconsole==5.5.1 +QtPy==2.4.1 +referencing==0.31.1 +requests==2.31.0 +rfc3339-validator==0.1.4 +rfc3986-validator==0.1.1 +rpds-py==0.13.2 +seaborn==0.13.0 +Send2Trash==1.8.2 +six==1.16.0 +sniffio==1.3.0 +soupsieve==2.5 +stack-data==0.6.3 +tenacity==8.2.3 +terminado==0.18.0 +tinycss2==1.2.1 +tomli==2.0.1 +tornado==6.4 +traitlets==5.14.0 +types-python-dateutil==2.8.19.14 +typing_extensions==4.8.0 +tzdata==2023.3 +uri-template==1.3.0 +urllib3==2.1.0 +wcwidth==0.2.12 +webcolors==1.13 +webencodings==0.5.1 +websocket-client==1.6.4 +widgetsnbextension==4.0.9 diff --git a/DHT/retrieval_on_das_plotting.ipynb b/DHT/retrieval_on_das_plotting.ipynb new file mode 100644 index 0000000..86ffaea --- /dev/null +++ b/DHT/retrieval_on_das_plotting.ipynb @@ -0,0 +1,281 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": null, + "outputs": [], + "source": [ + "import os\n", + "import pandas as pd\n", + "from IPython.display import display\n", + "import warnings\n", + "warnings.filterwarnings('ignore')\n", + "\n", + "from plots import make_folder, SingleMetrics, CombinedMetrics\n", + "import plots\n", + "\n", + "# Necessary folders to start\n", + "CSV_FOLDER = \"./csvs/retrieval_test\"\n", + "IMG_FOLDER = \"./imgs/retrieval_test\"\n", + "\n", + "# make sure that the output folder exists\n", + "make_folder(IMG_FOLDER, \"keeping track of the generated images\")\n" + ], + "metadata": { + "collapsed": false + } + }, + { + "cell_type": "code", + "execution_count": null, + "outputs": [], + "source": [ + "# Read all the available csv files in the given folder\n", + "def read_files_with(target: str):\n", + " files = []\n", + " for dir, _, files in os.walk(CSV_FOLDER):\n", + " for file in files:\n", + " if target in file:\n", + " files.append(dir+\"/\"+file)\n", + " else:\n", + " continue\n", + " print(f\"found {len(files)} with {target} files in {CSV_FOLDER}\")\n" + ], + "metadata": { + "collapsed": false + } + }, + { + "cell_type": "markdown", + "source": [ + "## Analysis of the data\n", + "#### Individual metrics" + ], + "metadata": { + "collapsed": false + } + }, + { + "cell_type": "code", + "execution_count": null, + "outputs": [], + "source": [ + "# Individual metrics\n", + "def print_avg_lookup(df):\n", + " print(f\"lookup_wallclock_time\\t\\t\\t {df.lookup_wallclock_time.mean()}\")\n", + " print(f\"attempted_nodes\\t\\t\\t\\t\\t {df.attempted_nodes.mean()}\")\n", + " print(f\"finished_connection_attempts\\t {df.finished_connection_attempts.mean()}\")\n", + " print(f\"successful_connections\\t\\t\\t {df.successful_connections.mean()}\")\n", + " print(f\"failed_connections\\t\\t\\t\\t {df.failed_connections.mean()}\")\n", + " print(f\"total_discovered_nodes\\t\\t\\t {df.total_discovered_nodes.mean()}\")\n", + " print(f\"retrievable\\t\\t\\t\\t\\t\\t {df.retrievable.mean()}\")\n", + " print(f\"accuracy\\t\\t\\t\\t\\t\\t {df.accuracy.mean()}\")\n", + "\n", + "\n", + "# Display the sigle metrics of the test individually\n", + "files = read_files_with(\"retrieval_lookup_nn\")\n", + "for file in files:\n", + " df = pd.read_csv(file)\n", + " print(\"\\nmax simulated lookup delay\")\n", + " display(df.loc[df['lookup_aggregated_delay'].idxmax()])\n", + "\n", + " print(\"\\nmin simulated lookup delay\")\n", + " display(df.loc[df['lookup_aggregated_delay'].idxmin()])\n", + "\n", + " print(\"\\navg simulated lookup delay\")\n", + " print_avg_lookup(df)\n", + " metrics = SingleMetrics(file, IMG_FOLDER, \"Retrievals\", {\n", + " \"retrievable\": {\n", + " \"title_tag\": \"retriebable\",\n", + " \"xlabel_tag\": \"retriebable\",\n", + " \"ylabel_tag\": \"\",\n", + " },})" + ], + "metadata": { + "collapsed": false, + "is_executing": true + } + }, + { + "cell_type": "markdown", + "source": [ + "#### Aggregated accross samples" + ], + "metadata": { + "collapsed": false + } + }, + { + "cell_type": "code", + "execution_count": null, + "outputs": [], + "source": [ + "# aggregate metrics across runs\n", + "files = read_files_with(\"lookup_nn\")\n", + "unifiedMetrics = CombinedMetrics(\n", + " files=files, aggregator=\"fast_delay_range\",\n", + " operation=\"retrieval\",\n", + " filters=[\"y0.125\", \"cdr50-75\"], output_image_folder=IMG_FOLDER,\n", + " metrics={\n", + " \"retrievable\": {\n", + " \"title_tag\": \"retriebable\",\n", + " \"xlabel_tag\": \"retriebable\",\n", + " \"ylabel_tag\": \"\",\n", + " },\n", + " },\n", + " legend=[\n", + " plots.RETRIEVAL_NODES,\n", + " plots.CONCURRENT_SAMPLES,\n", + " plots.FAST_ERROR_RATE,\n", + " plots.CONNECTION_DELAYS,\n", + " plots.GAMMA,\n", + " ])" + ], + "metadata": { + "collapsed": false + } + }, + { + "cell_type": "code", + "execution_count": null, + "outputs": [], + "source": [ + "# example to reproduce the network details\n", + "import seaborn as sns\n", + "import matplotlib.pyplot as plt\n", + "\n", + "\n", + "file = CSV_FOLDER+\"/retrieval_lookup_network_nn12000_rn100_sampl100_fer10_ser0_cdr50-75_fdr50-100_sdr0_k20_a3_b20_y0.125_steps3.csv\"\n", + "\n", + "df = pd.read_csv(file)\n", + "data = df.groupby([\"from\", \"to\"]).count()\n", + "data = data.reset_index()\n", + "data = data.rename(columns={\"Unnamed: 0\": \"total_connections\"})\n", + "data = data.sort_values(by=\"total_connections\", ascending=False)\n", + "pivoted_data = data.pivot(index=\"from\", columns=\"to\", values=\"total_connections\").fillna(0)\n", + "display(pivoted_data)\n", + "\n", + "# plot heatmap of connections\n", + "cmap = sns.cm.rocket_r\n", + "\n", + "sns.set()\n", + "plt.show()\n", + "g = sns.heatmap(data=pivoted_data, xticklabels=\"to\", yticklabels=\"from\", cmap = cmap)\n" + ], + "metadata": { + "collapsed": false + } + }, + { + "cell_type": "code", + "execution_count": null, + "outputs": [], + "source": [ + "# example to reproduce the network details\n", + "import networkx as nx\n", + "import plotly.graph_objects as go\n", + "\n", + "\n", + "file = CSV_FOLDER+\"/retrieval_lookup_network_nn12000_rn100_sampl100_fer10_ser0_cdr50-75_fdr50-100_sdr0_k20_a3_b20_y0.125_steps3.csv\"\n", + "\n", + "df = pd.read_csv(file)\n", + "df = df.groupby([\"from\", \"to\"]).size().reset_index(name=\"count\")\n", + "top_interactions = df.sort_values('count', ascending=False).head(10000) # top 10000 interactions\n", + "display(top_interactions)\n", + "\n", + "G = nx.from_pandas_edgelist(top_interactions, 'to', 'from', ['count'])\n", + "pos = nx.spring_layout(G)\n", + "\n", + "for node in G.nodes():\n", + " G.nodes[node]['pos'] = list(pos[node])\n", + "\n", + "edge_x = []\n", + "edge_y = []\n", + "for edge in G.edges():\n", + " x0, y0 = G.nodes[edge[0]]['pos']\n", + " x1, y1 = G.nodes[edge[1]]['pos']\n", + " edge_x.extend([x0, x1, None])\n", + " edge_y.extend([y0, y1, None])\n", + "\n", + "node_x = [pos[node][0] for node in G.nodes()]\n", + "node_y = [pos[node][1] for node in G.nodes()]\n", + "\n", + "edge_trace = go.Scatter(\n", + " x=edge_x, y=edge_y,\n", + " line=dict(width=0.5, color='#888'),\n", + " hoverinfo='none',\n", + " mode='lines')\n", + "\n", + "node_trace = go.Scatter(\n", + " x=node_x, y=node_y,\n", + " mode='markers',\n", + " hoverinfo='text',\n", + " marker=dict(\n", + " showscale=True,\n", + " colorscale='YlGnBu',\n", + " size=10,\n", + " colorbar=dict(\n", + " thickness=15,\n", + " title='Node Connections',\n", + " xanchor='left',\n", + " titleside='right'\n", + " ),\n", + " line_width=2))\n", + "\n", + "node_adjacencies = []\n", + "node_text = []\n", + "for node in G.nodes():\n", + " adjacencies = list(G.adj[node]) # List of nodes adjacent to the current node\n", + " num_connections = len(adjacencies)\n", + "\n", + " node_adjacencies.append(num_connections)\n", + " node_text.append(f'Node id: {node}
# of connections: {num_connections}')\n", + "\n", + "node_trace.marker.color = node_adjacencies\n", + "node_trace.text = node_text\n", + "\n", + "fig = go.Figure(data=[edge_trace, node_trace],\n", + " layout=go.Layout(\n", + " title='Network of Top Address Interactions',\n", + " titlefont_size=16,\n", + " showlegend=False,\n", + " hovermode='closest',\n", + " margin=dict(b=0, l=0, r=0, t=0),\n", + " annotations=[dict(\n", + " text=\"Based on top interactions\",\n", + " showarrow=False,\n", + " xref=\"paper\", yref=\"paper\",\n", + " x=0.005, y=-0.002)],\n", + " xaxis=dict(showgrid=False, zeroline=False, showticklabels=False),\n", + " yaxis=dict(showgrid=False, zeroline=False, showticklabels=False))\n", + " )\n", + "fig.update_layout(title_text=\"DHT network's interactions\")\n", + "fig.show()\n" + ], + "metadata": { + "collapsed": false + } + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 2 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython2", + "version": "2.7.6" + } + }, + "nbformat": 4, + "nbformat_minor": 0 +} diff --git a/DHT/utils.py b/DHT/utils.py new file mode 100644 index 0000000..0ea8c3f --- /dev/null +++ b/DHT/utils.py @@ -0,0 +1,7 @@ + +def getStrFromDelayRange(range): + if range == None: + delay = "0" + else: + delay = f"{range[0]}-{range[-1]}" + return delay diff --git a/README.md b/README.md index d158e01..4ddf7a9 100644 --- a/README.md +++ b/README.md @@ -26,7 +26,7 @@ cd das-research ``` python3 -m venv myenv source myenv/bin/activate -pip3 install -r DAS/requirements.txt +bash install_dependencies.sh ``` ## Run the simulator diff --git a/install_dependencies.sh b/install_dependencies.sh new file mode 100644 index 0000000..47e38d2 --- /dev/null +++ b/install_dependencies.sh @@ -0,0 +1,21 @@ +VENV="./venv" + +echo "Installing dependencies for DAS..." + +# activate the venv or raise error if error +source $VENV/bin/activate +if [ $? -eq 0 ]; then + echo "venv successfully sourced" +else + echo "unable to source venv at $VENV , does it exist?" + exit 1 +fi + +# make sure that the submodule module is correctly downloaded +git submodule update --init + +# install requirements for DAS and py-dht and install the dht module from py-dht +pip3 install -r DAS/requirements.txt +pip3 install -r DHT/requirements.txt +pip3 install -r py-dht/requirements.txt +python -m pip install -e py-dht/ diff --git a/py-dht b/py-dht new file mode 160000 index 0000000..9da9d74 --- /dev/null +++ b/py-dht @@ -0,0 +1 @@ +Subproject commit 9da9d74c95ab5f24a3ffa0605560ed5b77a7901b diff --git a/study.py b/study.py index fff5205..476d19c 100644 --- a/study.py +++ b/study.py @@ -1,6 +1,7 @@ #! /bin/python3 import time, sys, random, copy +from datetime import datetime import importlib import subprocess from joblib import Parallel, delayed @@ -33,9 +34,8 @@ def runOnce(config, shape, execID): sim.initLogger() sim.initValidators() sim.initNetwork() - result = sim.run() + result = sim.runBlockBroadcasting() sim.logger.info("Shape: %s ... Block Available: %d in %d steps" % (str(sim.shape.__dict__), result.blockAvailable, len(result.missingVector)), extra=sim.format) - if config.dumpXML: result.dump() @@ -79,7 +79,7 @@ def study(): logger.info("Starting simulations:", extra=format) start = time.time() - results = Parallel(config.numJobs)(delayed(runOnce)(config, shape ,execID) for shape in config.nextShape()) + results = Parallel(config.numJobs)(delayed(runOnce)(config, shape, execID) for shape in config.nextShape()) end = time.time() logger.info("A total of %d simulations ran in %d seconds" % (len(results), end-start), extra=format)