Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Knative deployment support #206

Open
wants to merge 11 commits into
base: master
Choose a base branch
from
2 changes: 1 addition & 1 deletion benchmarks/000.microbenchmarks/010.sleep/input.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,5 @@
def buckets_count():
return (0, 0)

def generate_input(data_dir, size, input_buckets, output_buckets, upload_func):
def generate_input(data_dir, size, benchmarks_bucket, input_buckets, output_buckets, upload_func):
return { 'sleep': size_generators[size] }
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,5 @@
def buckets_count():
return (0, 1)

def generate_input(data_dir, size, input_buckets, output_buckets, upload_func):
def generate_input(data_dir, size, benchmarks_bucket, input_buckets, output_buckets, upload_func):
return {'output-bucket': output_buckets[0]}
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,5 @@
def buckets_count():
return (0, 1)

def generate_input(data_dir, size, input_buckets, output_buckets, upload_func):
def generate_input(data_dir, size, benchmarks_bucket, input_buckets, output_buckets, upload_func):
return {'output-bucket': output_buckets[0]}
2 changes: 1 addition & 1 deletion benchmarks/000.microbenchmarks/040.server-reply/input.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,5 @@
def buckets_count():
return (0, 0)

def generate_input(data_dir, size, input_buckets, output_buckets, upload_func):
def generate_input(data_dir, size, benchmarks_bucket, input_buckets, output_buckets, upload_func):
return { 'sleep': size_generators[size] }
54 changes: 54 additions & 0 deletions benchmarks/wrappers/knative/nodejs/index.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
const { CloudEvent, HTTP } = require('cloudevents');
const handler = require('./function').handler;

async function handle(context, event) {
const startTime = new Date();

try {
// Ensure event data is parsed correctly
const eventData = event ? event : context.body;
context.log.info(`Received event: ${JSON.stringify(eventData)}`);

// Call the handler function with the event data
const result = await handler(eventData);
const endTime = new Date();

context.log.info(`Function result: ${JSON.stringify(result)}`);
const resultTime = (endTime - startTime) / 1000; // Time in seconds

// Create a response
const response = {
begin: startTime.toISOString(),
end: endTime.toISOString(),
results_time: resultTime,
result: result
};

// Return the response
return {
data: response,
headers: { 'Content-Type': 'application/json' },
statusCode: 200
};
} catch (error) {
const endTime = new Date();
const resultTime = (endTime - startTime) / 1000; // Time in seconds

context.log.error(`Error - invocation failed! Reason: ${error.message}`);
const response = {
begin: startTime.toISOString(),
end: endTime.toISOString(),
results_time: resultTime,
result: `Error - invocation failed! Reason: ${error.message}`
};

// Return the error response
return {
data: response,
headers: { 'Content-Type': 'application/json' },
statusCode: 500
};
}
}

module.exports = handle;
63 changes: 63 additions & 0 deletions benchmarks/wrappers/knative/nodejs/storage.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@

const minio = require('minio'),
path = require('path'),
uuid = require('uuid'),
util = require('util'),
stream = require('stream'),
fs = require('fs');

class minio_storage {

constructor() {
let address = process.env.MINIO_STORAGE_CONNECTION_URL;
let access_key = process.env.MINIO_STORAGE_ACCESS_KEY;
let secret_key = process.env.MINIO_STORAGE_SECRET_KEY;

this.client = new minio.Client(
{
endPoint: address.split(':')[0],
port: parseInt(address.split(':')[1], 10),
accessKey: access_key,
secretKey: secret_key,
useSSL: false
}
);
}

unique_name(file) {
let name = path.parse(file);
let uuid_name = uuid.v4().split('-')[0];
return path.join(name.dir, util.format('%s.%s%s', name.name, uuid_name, name.ext));
}

upload(bucket, file, filepath) {
let uniqueName = this.unique_name(file);
return [uniqueName, this.client.fPutObject(bucket, uniqueName, filepath)];
};

download(bucket, file, filepath) {
return this.client.fGetObject(bucket, file, filepath);
};

uploadStream(bucket, file) {
var write_stream = new stream.PassThrough();
let uniqueName = this.unique_name(file);
let promise = this.client.putObject(bucket, uniqueName, write_stream, write_stream.size);
return [write_stream, promise, uniqueName];
};

downloadStream(bucket, file) {
var read_stream = new stream.PassThrough();
return this.client.getObject(bucket, file);
};

static get_instance() {
if(!this.instance) {
this.instance = new storage();
}
return this.instance;
}
octonawish-akcodes marked this conversation as resolved.
Show resolved Hide resolved


};
exports.storage = minio_storage;
42 changes: 42 additions & 0 deletions benchmarks/wrappers/knative/python/func.py
octonawish-akcodes marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
import logging
import datetime
from flask import jsonify
from parliament import Context
from function import handler

def main(context: Context):
logging.getLogger().setLevel(logging.INFO)
begin = datetime.datetime.now() # Initialize begin outside the try block
octonawish-akcodes marked this conversation as resolved.
Show resolved Hide resolved

try:
# Extract JSON data from the request
event = context.request.json

# Update the timestamp after extracting JSON data
begin = datetime.datetime.now()
octonawish-akcodes marked this conversation as resolved.
Show resolved Hide resolved
# Pass the extracted JSON data to the handler function
ret = handler(event)
end = datetime.datetime.now()
logging.info(f"Function result: {ret}")
results_time = (end - begin) / datetime.timedelta(microseconds=1)

response = {
"begin": begin.strftime("%s.%f"),
"end": end.strftime("%s.%f"),
"results_time": results_time,
"result": ret,
}

return jsonify(response), 200

except Exception as e:
octonawish-akcodes marked this conversation as resolved.
Show resolved Hide resolved
end = datetime.datetime.now()
results_time = (end - begin) / datetime.timedelta(microseconds=1)
logging.error(f"Error - invocation failed! Reason: {e}")
response = {
"begin": begin.strftime("%s.%f"),
"end": end.strftime("%s.%f"),
"results_time": results_time,
"result": f"Error - invocation failed! Reason: {e}",
octonawish-akcodes marked this conversation as resolved.
Show resolved Hide resolved
}
return jsonify(response), 500
80 changes: 80 additions & 0 deletions benchmarks/wrappers/knative/python/storage.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
import os
import uuid
import json
octonawish-akcodes marked this conversation as resolved.
Show resolved Hide resolved
import minio
import logging


class storage:
instance = None
client = None

def __init__(self):
try:
"""
Minio does not allow another way of configuring timeout for connection.
The rest of configuration is copied from source code of Minio.
"""
import urllib3
from datetime import timedelta

timeout = timedelta(seconds=1).seconds

mgr = urllib3.PoolManager(
timeout=urllib3.util.Timeout(connect=timeout, read=timeout),
maxsize=10,
retries=urllib3.Retry(
total=5, backoff_factor=0.2, status_forcelist=[500, 502, 503, 504]
)
)
self.client = minio.Minio(
os.getenv("MINIO_STORAGE_CONNECTION_URL"),
access_key=os.getenv("MINIO_STORAGE_ACCESS_KEY"),
secret_key=os.getenv("MINIO_STORAGE_SECRET_KEY"),
secure=False,
http_client=mgr
)
except Exception as e:
logging.info(e)
raise e
octonawish-akcodes marked this conversation as resolved.
Show resolved Hide resolved
octonawish-akcodes marked this conversation as resolved.
Show resolved Hide resolved

@staticmethod
def unique_name(name):
name, extension = os.path.splitext(name)
return '{name}.{random}{extension}'.format(
name=name,
extension=extension,
random=str(uuid.uuid4()).split('-')[0]
)


def upload(self, bucket, file, filepath):
key_name = storage.unique_name(file)
self.client.fput_object(bucket, key_name, filepath)
return key_name
octonawish-akcodes marked this conversation as resolved.
Show resolved Hide resolved

def download(self, bucket, file, filepath):
self.client.fget_object(bucket, file, filepath)
octonawish-akcodes marked this conversation as resolved.
Show resolved Hide resolved

def download_directory(self, bucket, prefix, path):
objects = self.client.list_objects(bucket, prefix, recursive=True)
for obj in objects:
file_name = obj.object_name
self.download(bucket, file_name, os.path.join(path, file_name))
octonawish-akcodes marked this conversation as resolved.
Show resolved Hide resolved

def upload_stream(self, bucket, file, bytes_data):
key_name = storage.unique_name(file)
self.client.put_object(
bucket, key_name, bytes_data, bytes_data.getbuffer().nbytes
)
return key_name
octonawish-akcodes marked this conversation as resolved.
Show resolved Hide resolved

def download_stream(self, bucket, file):
data = self.client.get_object(bucket, file)
return data.read()
octonawish-akcodes marked this conversation as resolved.
Show resolved Hide resolved

@staticmethod
def get_instance():
if storage.instance is None:
storage.instance = storage()
return storage.instance
20 changes: 20 additions & 0 deletions config/example.json
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,26 @@
"output_buckets": [],
"type": "minio"
}
},
"knative": {
"shutdownStorage": false,
octonawish-akcodes marked this conversation as resolved.
Show resolved Hide resolved
"removeCluster": false,
"knativeExec": "func",
"docker_registry": {
"registry": "",
"username": "",
"password": ""
},
"storage": {
"address": "localhost:9000",
"mapped_port": 9000,
"access_key": "myaccesskey",
"secret_key": "mysecretkey",
"instance_id": "",
"input_buckets": [],
"output_buckets": [],
"type": "minio"
}
}
}
}
36 changes: 36 additions & 0 deletions config/systems.json
Original file line number Diff line number Diff line change
Expand Up @@ -234,5 +234,41 @@
}
}
}
},
"knative": {
"languages": {
"python": {
"base_images": {
"3.9": "python:3.9-slim",
octonawish-akcodes marked this conversation as resolved.
Show resolved Hide resolved
"3.10": "python:3.10-slim"
},
"images": [],
"username": "docker_user",
"deployment": {
"files": [
"func.py",
"storage.py"
],
"packages": {
"parliament-functions": "0.1.0"
}
}
},
"nodejs": {
"base_images": {
"20": "node:20",
"18": "node:18"
},
"images": [],
"username": "docker_user",
"deployment": {
"files": [
"index.js",
"storage.js"
],
"packages": []
}
}
}
}
}
10 changes: 8 additions & 2 deletions install.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
parser = argparse.ArgumentParser(description="Install SeBS and dependencies.")
parser.add_argument('--venv', metavar='DIR', type=str, default="python-venv", help='destination of local Python virtual environment')
parser.add_argument('--python-path', metavar='DIR', type=str, default="python3", help='Path to local Python installation.')
for deployment in ["aws", "azure", "gcp", "openwhisk"]:
for deployment in ["aws", "azure", "gcp", "openwhisk", "knative"]:
parser.add_argument(f"--{deployment}", action="store_const", const=True, default=True, dest=deployment)
parser.add_argument(f"--no-{deployment}", action="store_const", const=False, default=True, dest=deployment)
for deployment in ["local"]:
Expand Down Expand Up @@ -63,6 +63,13 @@ def execute(cmd, cwd=None):
execute(f'echo "export SEBS_WITH_OPENWHISK={flag}" >> {env_dir}/bin/activate')
execute(f'echo "unset SEBS_WITH_OPENWHISK" >> {env_dir}/bin/deactivate')

if args.knative:
print("Install Python dependencies for Knative")
execute(". {}/bin/activate && pip3 install -r requirements.knative.txt".format(env_dir))
flag = "TRUE" if args.knative else "FALSE"
execute(f'echo "export SEBS_WITH_KNATIVE={flag}" >> {env_dir}/bin/activate')
execute(f'echo "unset SEBS_WITH_KNATIVE" >> {env_dir}/bin/deactivate')

if args.local:
print("Install Python dependencies for local")
execute(". {}/bin/activate && pip3 install -r requirements.local.txt".format(env_dir))
Expand Down Expand Up @@ -99,4 +106,3 @@ def execute(cmd, cwd=None):
execute("python3 setup.py build")
execute("python3 pypapi/papi_build.py")
os.chdir(cur_dir)

2 changes: 2 additions & 0 deletions requirements.knative.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
parliament-functions==0.1.0
octonawish-akcodes marked this conversation as resolved.
Show resolved Hide resolved
flask
2 changes: 1 addition & 1 deletion sebs.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ def common_params(func):
@click.option(
"--deployment",
default=None,
type=click.Choice(["azure", "aws", "gcp", "local", "openwhisk"]),
type=click.Choice(["azure", "aws", "gcp", "local", "openwhisk", "knative"]),
help="Cloud deployment to use.",
)
@click.option(
Expand Down
3 changes: 2 additions & 1 deletion sebs/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ def image_tag_prefix(self, tag: str):
self._image_tag_prefix = tag

def docker_repository(self) -> str:
return self._system_config["general"]["docker_repository"]
return "abhidocker12"
octonawish-akcodes marked this conversation as resolved.
Show resolved Hide resolved

def deployment_packages(self, deployment_name: str, language_name: str) -> Dict[str, str]:
return self._system_config[deployment_name]["languages"][language_name]["deployment"][
Expand Down Expand Up @@ -68,3 +68,4 @@ def benchmark_image_tag(

def username(self, deployment_name: str, language_name: str) -> str:
return self._system_config[deployment_name]["languages"][language_name]["username"]

Loading