Skip to content
Open
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion benchmarks/000.microbenchmarks/010.sleep/input.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,5 @@
def buckets_count():
return (0, 0)

def generate_input(data_dir, size, input_buckets, output_buckets, upload_func):
def generate_input(data_dir, size, benchmarks_bucket, input_buckets, output_buckets, upload_func):
return { 'sleep': size_generators[size] }
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,5 @@
def buckets_count():
return (0, 1)

def generate_input(data_dir, size, input_buckets, output_buckets, upload_func):
def generate_input(data_dir, size, benchmarks_bucket, input_buckets, output_buckets, upload_func):
return {'output-bucket': output_buckets[0]}
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,5 @@
def buckets_count():
return (0, 1)

def generate_input(data_dir, size, input_buckets, output_buckets, upload_func):
def generate_input(data_dir, size, benchmarks_bucket, input_buckets, output_buckets, upload_func):
return {'output-bucket': output_buckets[0]}
2 changes: 1 addition & 1 deletion benchmarks/000.microbenchmarks/040.server-reply/input.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,5 @@
def buckets_count():
return (0, 0)

def generate_input(data_dir, size, input_buckets, output_buckets, upload_func):
def generate_input(data_dir, size, benchmarks_bucket, input_buckets, output_buckets, upload_func):
return { 'sleep': size_generators[size] }
61 changes: 61 additions & 0 deletions benchmarks/wrappers/knative/nodejs/index.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
const {
CloudEvent,
HTTP
} = require('cloudevents');
const path = require('path');
const fs = require('fs');
const {
v4: uuidv4
} = require('uuid');

async function handle(context, event) {
const requestId = uuidv4();

// Ensure event data is parsed correctly
const eventData = event ? event : context.body;
context.log.info(`Received event: ${JSON.stringify(eventData)}`);

const func = require('/function/function.js');
const begin = Date.now() / 1000;
const start = process.hrtime();

try {
// Call the handler function with the event data
const ret = await func.handler(eventData);
const elapsed = process.hrtime(start);
const end = Date.now() / 1000;
const micro = elapsed[1] / 1e3 + elapsed[0] * 1e6;

let is_cold = false;
const fname = path.join('/tmp', 'cold_run');
if (!fs.existsSync(fname)) {
is_cold = true;
fs.closeSync(fs.openSync(fname, 'w'));
}

context.log.info(`Function result: ${JSON.stringify(ret)}`);

return {
begin: begin,
end: end,
compute_time: micro,
results_time: 0,
result: ret,
request_id: requestId,
is_cold: is_cold,
};
} catch (error) {
context.log.error(`Error - invocation failed! Reason: ${error.message}`);
return {
begin: begin,
end: Date.now() / 1000,
compute_time: process.hrtime(start),
results_time: 0,
result: `Error - invocation failed! Reason: ${error.message}`,
request_id: requestId,
is_cold: false,
};
}
}

exports.handle = handle;
60 changes: 60 additions & 0 deletions benchmarks/wrappers/knative/nodejs/storage.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
const minio = require('minio'),
path = require('path'),
uuid = require('uuid'),
util = require('util'),
stream = require('stream'),
fs = require('fs');

class minio_storage {

constructor() {
let address = process.env.MINIO_STORAGE_CONNECTION_URL;
let access_key = process.env.MINIO_STORAGE_ACCESS_KEY;
let secret_key = process.env.MINIO_STORAGE_SECRET_KEY;

this.client = new minio.Client({
endPoint: address.split(':')[0],
port: parseInt(address.split(':')[1], 10),
accessKey: access_key,
secretKey: secret_key,
useSSL: false
});
}

unique_name(file) {
let name = path.parse(file);
let uuid_name = uuid.v4().split('-')[0];
return path.join(name.dir, util.format('%s.%s%s', name.name, uuid_name, name.ext));
}

upload(bucket, file, filepath) {
let uniqueName = this.unique_name(file);
return [uniqueName, this.client.fPutObject(bucket, uniqueName, filepath)];
};

download(bucket, file, filepath) {
return this.client.fGetObject(bucket, file, filepath);
};

uploadStream(bucket, file) {
var write_stream = new stream.PassThrough();
let uniqueName = this.unique_name(file);
let promise = this.client.putObject(bucket, uniqueName, write_stream, write_stream.size);
return [write_stream, promise, uniqueName];
};

downloadStream(bucket, file) {
var read_stream = new stream.PassThrough();
return this.client.getObject(bucket, file);
};

static get_instance() {
if (!this.instance) {
this.instance = new storage();
}
return this.instance;
}


};
exports.storage = minio_storage;
58 changes: 58 additions & 0 deletions benchmarks/wrappers/knative/python/func.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
import logging
import datetime
import os
import uuid
from flask import jsonify
from parliament import Context
import minio


def main(context: Context):
logging.getLogger().setLevel(logging.INFO)
begin = datetime.datetime.now() # Initialize begin outside the try block

event = context.request.json
logging.info(f"Received event: {event}")

request_id = str(uuid.uuid4()) # Generate a unique request ID

try:
from function import function

# Update the timestamp after extracting JSON data
begin = datetime.datetime.now()
# Pass the extracted JSON data to the handler function
ret = function.handler(event)
end = datetime.datetime.now()
logging.info("Function result: {}".format(ret))
log_data = {"result": ret["result"]}
if "measurement" in ret:
log_data["measurement"] = ret["measurement"]
results_time = (end - begin) / datetime.timedelta(microseconds=1)

is_cold = False
fname = "cold_run"
if not os.path.exists(fname):
is_cold = True
open(fname, "a").close()

return {
"request_id": request_id,
"begin": begin.strftime("%s.%f"),
"end": end.strftime("%s.%f"),
"results_time": results_time,
"is_cold": is_cold,
"result": log_data,
}

except Exception as e:
end = datetime.datetime.now()
results_time = (end - begin) / datetime.timedelta(microseconds=1)
logging.error(f"Error - invocation failed! Reason: {e}")
return {
"request_id": request_id,
"begin": begin.strftime("%s.%f"),
"end": end.strftime("%s.%f"),
"results_time": results_time,
"result": f"Error - invocation failed! Reason: {e}",
}
77 changes: 77 additions & 0 deletions benchmarks/wrappers/knative/python/storage.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
import os
import uuid
import json
import minio
import logging


class storage:
instance = None
client = None

def __init__(self):
try:
"""
Minio does not allow another way of configuring timeout for connection.
The rest of configuration is copied from source code of Minio.
"""
import urllib3
from datetime import timedelta

timeout = timedelta(seconds=1).seconds

mgr = urllib3.PoolManager(
timeout=urllib3.util.Timeout(connect=timeout, read=timeout),
maxsize=10,
retries=urllib3.Retry(
total=5, backoff_factor=0.2, status_forcelist=[500, 502, 503, 504]
),
)
self.client = minio.Minio(
os.getenv("MINIO_STORAGE_CONNECTION_URL"),
access_key=os.getenv("MINIO_STORAGE_ACCESS_KEY"),
secret_key=os.getenv("MINIO_STORAGE_SECRET_KEY"),
secure=False,
http_client=mgr,
)
except Exception as e:
logging.info(e)
raise e

@staticmethod
def unique_name(name):
name, extension = os.path.splitext(name)
return "{name}.{random}{extension}".format(
name=name, extension=extension, random=str(uuid.uuid4()).split("-")[0]
)

def upload(self, bucket, file, filepath):
key_name = storage.unique_name(file)
self.client.fput_object(bucket, key_name, filepath)
return key_name

def download(self, bucket, file, filepath):
self.client.fget_object(bucket, file, filepath)

def download_directory(self, bucket, prefix, path):
objects = self.client.list_objects(bucket, prefix, recursive=True)
for obj in objects:
file_name = obj.object_name
self.download(bucket, file_name, os.path.join(path, file_name))

def upload_stream(self, bucket, file, bytes_data):
key_name = storage.unique_name(file)
self.client.put_object(
bucket, key_name, bytes_data, bytes_data.getbuffer().nbytes
)
return key_name

def download_stream(self, bucket, file):
data = self.client.get_object(bucket, file)
return data.read()

@staticmethod
def get_instance():
if storage.instance is None:
storage.instance = storage()
return storage.instance
20 changes: 20 additions & 0 deletions config/example.json
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,26 @@
"output_buckets": [],
"type": "minio"
}
},
"knative": {
"shutdownStorage": false,
"removeCluster": false,
"knativeExec": "func",
"docker_registry": {
"registry": "",
"username": "",
"password": ""
},
"storage": {
"address": "localhost:9000",
"mapped_port": 9000,
"access_key": "myaccesskey",
"secret_key": "mysecretkey",
"instance_id": "",
"input_buckets": [],
"output_buckets": [],
"type": "minio"
}
}
}
}
45 changes: 45 additions & 0 deletions config/systems.json
Original file line number Diff line number Diff line change
Expand Up @@ -234,5 +234,50 @@
}
}
}
},
"knative": {
"languages": {
"python": {
"base_images": {
"3.9": "python:3.9-slim",
"3.10": "python:3.10-slim"
},
"images": [],
"username": "docker_user",
"deployment": {
"files": [
"func.py",
"storage.py"
],
"packages": {
"parliament-functions": "0.1.0",
"minio": "5.0.10",
"Pillow": "9.0.0",
"python-igraph": "0.8.0",
"squiggle": "0.3.1"
}
}
},
"nodejs": {
"base_images": {
"20": "node:20",
"18": "node:18"
},
"images": [],
"username": "docker_user",
"deployment": {
"files": [
"index.js",
"storage.js"
],
"packages": {
"faas-js-runtime": "^2.2.2",
"minio": "7.0.16",
"mustache": "^3.2.1",
"sharp": "^0.25"
}
}
}
}
}
}
Loading