From fca8fa2da55cd5bdccc26dd452a5ce3313a92af7 Mon Sep 17 00:00:00 2001 From: Abhishek Kumar Date: Wed, 19 Jun 2024 14:38:28 +0530 Subject: [PATCH] feat: Knative deployment support Signed-off-by: Abhishek Kumar --- config/systems.json | 39 ++++++ sebs.py | 2 +- sebs/faas/config.py | 4 + sebs/knative/config.py | 226 ++++++++++++++++++++++++++++++ sebs/knative/function.py | 148 ++++++++++++++++++++ sebs/knative/knative.py | 114 +++++++++++++++ sebs/knative/storage.py | 25 ++++ sebs/knative/trigger.py | 295 +++++++++++++++++++++++++++++++++++++++ sebs/types.py | 1 + 9 files changed, 853 insertions(+), 1 deletion(-) create mode 100644 sebs/knative/config.py create mode 100644 sebs/knative/function.py create mode 100644 sebs/knative/knative.py create mode 100644 sebs/knative/storage.py create mode 100644 sebs/knative/trigger.py diff --git a/config/systems.json b/config/systems.json index bb21dcd9..239d2016 100644 --- a/config/systems.json +++ b/config/systems.json @@ -234,5 +234,44 @@ } } } + }, + "knative": { + "languages": { + "python": { + "base_images": { + "3.9": "python:3.9-slim", + "3.10": "python:3.10-slim" + }, + "images": [ + "build", + "run" + ], + "username": "docker_user", + "deployment": { + "files": [ + "handler.py", + "storage.py" + ], + "packages": [] + } + }, + "nodejs": { + "base_images": { + "latest": "node:latest" + }, + "images": [ + "build", + "run" + ], + "username": "docker_user", + "deployment": { + "files": [ + "handler.js", + "storage.js" + ], + "packages": [] + } + } + } } } diff --git a/sebs.py b/sebs.py index ff7f7769..b95e51d6 100755 --- a/sebs.py +++ b/sebs.py @@ -88,7 +88,7 @@ def common_params(func): @click.option( "--deployment", default=None, - type=click.Choice(["azure", "aws", "gcp", "local", "openwhisk"]), + type=click.Choice(["azure", "aws", "gcp", "local", "openwhisk", "knative"]), help="Cloud deployment to use.", ) @click.option( diff --git a/sebs/faas/config.py b/sebs/faas/config.py index 19c7d3ab..889a0102 100644 --- a/sebs/faas/config.py +++ b/sebs/faas/config.py @@ -204,6 +204,10 @@ def deserialize(config: dict, cache: Cache, handlers: LoggingHandlers) -> Config from sebs.openwhisk.config import OpenWhiskConfig implementations["openwhisk"] = OpenWhiskConfig.deserialize + if has_platform("knative"): + from sebs.knative.config import KnativeConfig + + implementations["knative"] = KnativeConfig.deserialize func = implementations.get(name) assert func, "Unknown config type!" return func(config[name] if name in config else config, cache, handlers) diff --git a/sebs/knative/config.py b/sebs/knative/config.py new file mode 100644 index 00000000..ffa023c0 --- /dev/null +++ b/sebs/knative/config.py @@ -0,0 +1,226 @@ +from sebs.cache import Cache +from sebs.faas.config import Credentials, Resources, Config +from sebs.utils import LoggingHandlers +from sebs.storage.config import MinioConfig + +from typing import cast, Optional + + +class KnativeCredentials(Credentials): + @staticmethod + def deserialize(config: dict, cache: Cache, handlers: LoggingHandlers) -> Credentials: + return KnativeCredentials() + + def serialize(self) -> dict: + return {} + + +class KnativeResources(Resources): + def __init__( + self, + registry: Optional[str] = None, + username: Optional[str] = None, + password: Optional[str] = None, + registry_updated: bool = False, + ): + super().__init__(name="knative") + self._docker_registry = registry if registry != "" else None + self._docker_username = username if username != "" else None + self._docker_password = password if password != "" else None + self._registry_updated = registry_updated + self._storage: Optional[MinioConfig] = None + self._storage_updated = False + + @staticmethod + def typename() -> str: + return "Knative.Resources" + + @property + def docker_registry(self) -> Optional[str]: + return self._docker_registry + + @property + def docker_username(self) -> Optional[str]: + return self._docker_username + + @property + def docker_password(self) -> Optional[str]: + return self._docker_password + + @property + def storage_config(self) -> Optional[MinioConfig]: + return self._storage + + @property + def storage_updated(self) -> bool: + return self._storage_updated + + @property + def registry_updated(self) -> bool: + return self._registry_updated + + @staticmethod + def initialize(res: Resources, dct: dict): + ret = cast(KnativeResources, res) + ret._docker_registry = dct["registry"] + ret._docker_username = dct["username"] + ret._docker_password = dct["password"] + + @staticmethod + def deserialize(config: dict, cache: Cache, handlers: LoggingHandlers) -> Resources: + + cached_config = cache.get_config("knative") + ret = KnativeResources() + if cached_config: + super(KnativeResources, KnativeResources).initialize( + ret, cached_config["resources"] + ) + + # Check for new config - overrides but check if it's different + if "docker_registry" in config: + + KnativeResources.initialize(ret, config["docker_registry"]) + ret.logging.info("Using user-provided Docker registry for Knative.") + ret.logging_handlers = handlers + + # check if there has been an update + if not ( + cached_config + and "resources" in cached_config + and "docker" in cached_config["resources"] + and cached_config["resources"]["docker"] == config["docker_registry"] + ): + ret._registry_updated = True + + # Load cached values + elif ( + cached_config + and "resources" in cached_config + and "docker" in cached_config["resources"] + ): + KnativeResources.initialize(ret, cached_config["resources"]["docker"]) + ret.logging_handlers = handlers + ret.logging.info("Using cached Docker registry for Knative") + else: + ret = KnativeResources() + ret.logging.info("Using default Docker registry for Knative.") + ret.logging_handlers = handlers + ret._registry_updated = True + + # Check for new config + if "storage" in config: + ret._storage = MinioConfig.deserialize(config["storage"]) + ret.logging.info("Using user-provided configuration of storage for Knative.") + + # check if there has been an update + if not ( + cached_config + and "resources" in cached_config + and "storage" in cached_config["resources"] + and cached_config["resources"]["storage"] == config["storage"] + ): + ret.logging.info( + "User-provided configuration is different from cached storage, " + "we will update existing Knative actions." + ) + ret._storage_updated = True + + # Load cached values + elif ( + cached_config + and "resources" in cached_config + and "storage" in cached_config["resources"] + ): + ret._storage = MinioConfig.deserialize(cached_config["resources"]["storage"]) + ret.logging.info("Using cached configuration of storage for Knative.") + + return ret + + def update_cache(self, cache: Cache): + super().update_cache(cache) + cache.update_config( + val=self.docker_registry, keys=["knative", "resources", "docker", "registry"] + ) + cache.update_config( + val=self.docker_username, keys=["knative", "resources", "docker", "username"] + ) + cache.update_config( + val=self.docker_password, keys=["knative", "resources", "docker", "password"] + ) + if self._storage: + self._storage.update_cache(["knative", "resources", "storage"], cache) + + def serialize(self) -> dict: + out: dict = { + **super().serialize(), + "docker_registry": self.docker_registry, + "docker_username": self.docker_username, + "docker_password": self.docker_password, + } + if self._storage: + out = {**out, "storage": self._storage.serialize()} + return out + + +class KnativeConfig(Config): + name: str + shutdownStorage: bool + cache: Cache + + def __init__(self, config: dict, cache: Cache): + super().__init__(name="knative") + self._credentials = KnativeCredentials() + self._resources = KnativeResources() + self.shutdownStorage = config["shutdownStorage"] + self.removeCluster = config["removeCluster"] + self.knative_exec = config["knativeExec"] + self.knative_bypass_security = config["knativeBypassSecurity"] + self.experimentalManifest = config["experimentalManifest"] + self.cache = cache + + @property + def credentials(self) -> KnativeCredentials: + return self._credentials + + @property + def resources(self) -> KnativeResources: + return self._resources + + @staticmethod + def initialize(cfg: Config, dct: dict): + cfg._region = dct["region"] + + def serialize(self) -> dict: + return { + "name": self._name, + "region": self._region, + "shutdownStorage": self.shutdownStorage, + "removeCluster": self.removeCluster, + "knativeExec": self.knative_exec, + "knativeBypassSecurity": self.knative_bypass_security, + "experimentalManifest": self.experimentalManifest, + "credentials": self._credentials.serialize(), + "resources": self._resources.serialize(), + } + + @staticmethod + def deserialize(config: dict, cache: Cache, handlers: LoggingHandlers) -> Config: + cached_config = cache.get_config("knative") + resources = cast( + KnativeResources, KnativeResources.deserialize(config, cache, handlers) + ) + + res = KnativeConfig(config, cached_config) + res.logging_handlers = handlers + res._resources = resources + return res + + def update_cache(self, cache: Cache): + cache.update_config(val=self.shutdownStorage, keys=["knative", "shutdownStorage"]) + cache.update_config(val=self.removeCluster, keys=["knative", "removeCluster"]) + cache.update_config(val=self.knative_exec, keys=["knative", "knativeExec"]) + cache.update_config(val=self.knative_bypass_security, keys=["knative", "knativeBypassSecurity"]) + cache.update_config( + val=self.experimentalManifest, keys=["knative", "experimentalManifest"] + ) + self.resources.update_cache(cache) diff --git a/sebs/knative/function.py b/sebs/knative/function.py new file mode 100644 index 00000000..aa562c6e --- /dev/null +++ b/sebs/knative/function.py @@ -0,0 +1,148 @@ +from __future__ import annotations + +from typing import cast, Optional +from dataclasses import dataclass + +from sebs.benchmark import Benchmark +from sebs.faas.function import Function, FunctionConfig, Runtime +from sebs.storage.config import MinioConfig +from sebs.knative.trigger import LibraryTrigger, HTTPTrigger + + +@dataclass +class KnativeFunctionConfig(FunctionConfig): + """ + Configuration class for Knative function specific configurations. + + Attributes: + docker_image (str): Docker image for the function. + namespace (str): Kubernetes namespace where the function is deployed (default is 'default'). + storage (Optional[MinioConfig]): Optional MinioConfig object for storage configuration. + """ + + docker_image: str = "" + namespace: str = "default" + storage: Optional[MinioConfig] = None + + @staticmethod + def deserialize(data: dict) -> KnativeFunctionConfig: + """ + Deserialize data from dictionary into KnativeFunctionConfig object. + + Args: + data (dict): Dictionary containing serialized data. + + Returns: + KnativeFunctionConfig: Deserialized KnativeFunctionConfig object. + """ + keys = list(KnativeFunctionConfig.__dataclass_fields__.keys()) + data = {k: v for k, v in data.items() if k in keys} + data["runtime"] = Runtime.deserialize(data["runtime"]) + if "storage" in data: + data["storage"] = MinioConfig.deserialize(data["storage"]) + return KnativeFunctionConfig(**data) + + def serialize(self) -> dict: + """ + Serialize KnativeFunctionConfig object into dictionary. + + Returns: + dict: Dictionary containing serialized data. + """ + return self.__dict__ + + @staticmethod + def from_benchmark(benchmark: Benchmark) -> KnativeFunctionConfig: + """ + Create KnativeFunctionConfig object from a benchmark. + + Args: + benchmark (Benchmark): Benchmark object. + + Returns: + KnativeFunctionConfig: Initialized KnativeFunctionConfig object. + """ + return super(KnativeFunctionConfig, KnativeFunctionConfig)._from_benchmark( + benchmark, KnativeFunctionConfig + ) + + +class KnativeFunction(Function): + """ + Class representing a Knative function. + + Attributes: + name (str): Name of the function. + benchmark (str): Benchmark associated with the function. + code_package_hash (str): Hash of the code package associated with the function. + cfg (KnativeFunctionConfig): Configuration object for the function. + """ + + def __init__( + self, name: str, benchmark: str, code_package_hash: str, cfg: KnativeFunctionConfig + ): + """ + Initialize KnativeFunction object. + + Args: + name (str): Name of the function. + benchmark (str): Benchmark associated with the function. + code_package_hash (str): Hash of the code package associated with the function. + cfg (KnativeFunctionConfig): Configuration object for the function. + """ + super().__init__(benchmark, name, code_package_hash, cfg) + + @property + def config(self) -> KnativeFunctionConfig: + """ + Get the configuration object of the function. + + Returns: + KnativeFunctionConfig: Configuration object of the function. + """ + return cast(KnativeFunctionConfig, self._cfg) + + @staticmethod + def typename() -> str: + """ + Return the typename of the KnativeFunction class. + + Returns: + str: Typename of the KnativeFunction class. + """ + return "Knative.Function" + + def serialize(self) -> dict: + """ + Serialize KnativeFunction object into dictionary. + + Returns: + dict: Dictionary containing serialized data. + """ + serialized_data = super().serialize() + serialized_data["config"] = self._cfg.serialize() + return serialized_data + + @staticmethod + def deserialize(cached_config: dict) -> KnativeFunction: + """ + Deserialize dictionary into KnativeFunction object. + + Args: + cached_config (dict): Dictionary containing serialized data. + + Returns: + KnativeFunction: Deserialized KnativeFunction object. + """ + cfg = KnativeFunctionConfig.deserialize(cached_config["config"]) + ret = KnativeFunction( + cached_config["name"], cached_config["benchmark"], cached_config["hash"], cfg + ) + for trigger in cached_config["triggers"]: + trigger_type = cast( + Trigger, + {"Library": LibraryTrigger, "HTTP": HTTPTrigger}.get(trigger["type"]), + ) + assert trigger_type, "Unknown trigger type {}".format(trigger["type"]) + ret.add_trigger(trigger_type.deserialize(trigger)) + return ret diff --git a/sebs/knative/knative.py b/sebs/knative/knative.py new file mode 100644 index 00000000..0a9160f3 --- /dev/null +++ b/sebs/knative/knative.py @@ -0,0 +1,114 @@ +from sebs.faas.system import System +from sebs.faas.function import Function, Trigger, ExecutionResult +from sebs.faas.storage import PersistentStorage +from sebs.benchmark import Benchmark +from sebs.config import SeBSConfig +from sebs.cache import Cache +from sebs.utils import LoggingHandlers +from sebs.faas.config import Resources +from typing import Dict, Tuple, Type, List, Optional +import docker +import uuid + +from .config import KnativeConfig + +class KnativeSystem(System): + def __init__(self, system_config: SeBSConfig, cache_client: Cache, docker_client: docker.client, logger_handlers: LoggingHandlers): + super().__init__(system_config, cache_client, docker_client) + # Initialize any additional Knative-specific attributes here + _config: KnativeConfig + + @property + def config(self) -> KnativeConfig: + # Return the configuration specific to Knative + return self._config + + @staticmethod + def function_type() -> Type[Function]: + # Return the specific function type for Knative + return Function + + def get_storage(self, replace_existing: bool = False) -> PersistentStorage: + # Implementation of persistent storage retrieval for Knative + # This might involve creating a persistent volume or bucket in Knative's ecosystem + pass + + def package_code( + self, + directory: str, + language_name: str, + language_version: str, + benchmark: str, + is_cached: bool, + ) -> Tuple[str, int]: + """ + Package code for Knative platform by building a Docker image. + + Args: + - directory: Directory where the function code resides. + - language_name: Name of the programming language (e.g., Python). + - language_version: Version of the programming language. + - benchmark: Identifier for the benchmark or function. + - is_cached: Flag indicating if the code is cached. + + Returns: + - Tuple containing the Docker image name (tag) and its size. + """ + + # Generate a unique Docker image name/tag for this function + docker_image_name = f"{benchmark}:{language_version}" + + # Build Docker image from the specified directory + image, _ = self._docker_client.images.build(path=directory, tag=docker_image_name) + + # Retrieve size of the Docker image + image_size = image.attrs['Size'] + + # Return the Docker image name (tag) and its size + return docker_image_name, image_size + + + def create_function(self, code_package: Benchmark, func_name: str) -> Function: + # Implementation for creating functions + return function + + def cached_function(self, function: Function): + # Implementation of retrieving cached function details for Knative + pass + + def update_function(self, function: Function, code_package: Benchmark): + # Implementation of function update for Knative + # might involve updating the Knative service with a new Docker image + pass + + def update_function_configuration(self, cached_function: Function, benchmark: Benchmark): + # Implementation of updating function configuration for Knative + pass + + def default_function_name(self, code_package: Benchmark) -> str: + # Implementation of default function naming for Knative + return f"{code_package.name}-{code_package.language_name}-{code_package.language_version}" + + def enforce_cold_start(self, functions: List[Function], code_package: Benchmark): + # Implementation of cold start enforcement for Knative + # I am assuiming this might involve deleting and redeploying the service to force a cold start + pass + + def download_metrics(self, function_name: str, start_time: int, end_time: int, requests: Dict[str, ExecutionResult], metrics: dict): + # Implementation of metric downloading for Knative + # Here I can review the knative inbuilt metric tool (flag) need to check + pass + + def create_trigger(self, function: Function, trigger_type: Trigger.TriggerType) -> Trigger: + # Implementation of trigger creation for Knative + # have to involve in setting up HTTP routes or event sources + trigger = Trigger(name=f"{function.name}-trigger", type=trigger_type) + return trigger + + def shutdown(self) -> None: + # Clean up any resources or connections + pass + + @staticmethod + def name() -> str: + return "Knative" diff --git a/sebs/knative/storage.py b/sebs/knative/storage.py new file mode 100644 index 00000000..0ef1c827 --- /dev/null +++ b/sebs/knative/storage.py @@ -0,0 +1,25 @@ +import docker +from sebs.faas.config import Resources +from sebs.storage import minio +from sebs.storage.config import MinioConfig +from sebs.cache import Cache + +class KnativeMinio(minio.Minio): + @staticmethod + def deployment_name() -> str: + return "knative" + + def __init__( + self, + docker_client: docker.client, + cache_client: Cache, + res: Resources, + replace_existing: bool, + ): + super().__init__(docker_client, cache_client, res, replace_existing) + + @staticmethod + def deserialize( + cached_config: MinioConfig, cache_client: Cache, resources: Resources + ) -> "KnativeMinio": + return super(KnativeMinio, KnativeMinio)._deserialize(cached_config, cache_client, resources, KnativeMinio) diff --git a/sebs/knative/trigger.py b/sebs/knative/trigger.py new file mode 100644 index 00000000..22c81568 --- /dev/null +++ b/sebs/knative/trigger.py @@ -0,0 +1,295 @@ +import concurrent.futures +import datetime +import json +import requests +import subprocess +import time +from typing import Dict, List, Optional # noqa + +from sebs.faas.function import ExecutionResult, Trigger + + +class LibraryTrigger(Trigger): + """ + Trigger implementation for invoking a Knative service using port forwarding and curl. + + Attributes: + function_name (str): The name of the function to invoke. + pod_name (str): The name of the Kubernetes pod where the function is deployed. + namespace (str): The Kubernetes namespace where the pod is deployed (default is 'default'). + """ + + def __init__(self, function_name: str, pod_name: str, namespace: str = "default"): + """ + Initialize the LibraryTrigger with the function name, pod name, and namespace. + + Args: + function_name (str): The name of the function to invoke. + pod_name (str): The name of the Kubernetes pod where the function is deployed. + namespace (str, optional): The Kubernetes namespace where the pod is deployed (default is 'default'). + """ + super().__init__() + self.function_name = function_name + self.pod_name = pod_name + self.namespace = namespace + + @staticmethod + def trigger_type() -> "Trigger.TriggerType": + """ + Return the type of trigger (LibraryTrigger). + + Returns: + Trigger.TriggerType: The trigger type (LibraryTrigger). + """ + return Trigger.TriggerType.LIBRARY + + @staticmethod + def get_curl_command(payload: dict) -> List[str]: + """ + Generate a curl command for invoking the function. + + Args: + payload (dict): The payload data to send with the request. + + Returns: + List[str]: The curl command as a list of strings. + """ + return [ + "curl", + "-X", + "POST", + "http://localhost:8080/handle", + "-d", + json.dumps(payload), + "-H", + "Content-Type: application/json", + ] + + def sync_invoke(self, payload: dict) -> ExecutionResult: + """ + Synchronously invoke the function using port forwarding and curl. + + Args: + payload (dict): The payload data to send with the request. + + Returns: + ExecutionResult: The result of the function invocation. + """ + port_forward_cmd = [ + "kubectl", + "port-forward", + f"pod/{self.pod_name}", + "8080:8080", + "-n", + self.namespace, + ] + + # Start port forwarding + port_forward_proc = subprocess.Popen( + port_forward_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE + ) + time.sleep(2) # Give some time for port forwarding to start + + command = self.get_curl_command(payload) + error = None + try: + begin = datetime.datetime.now() + response = subprocess.run( + command, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + check=True, + ) + end = datetime.datetime.now() + parsed_response = response.stdout.decode("utf-8") + except (subprocess.CalledProcessError, FileNotFoundError) as e: + end = datetime.datetime.now() + error = e + + # Stop port forwarding + port_forward_proc.terminate() + + knative_result = ExecutionResult.from_times(begin, end) + if error is not None: + self.logging.error(f"Invocation of {self.function_name} failed!") + knative_result.stats.failure = True + return knative_result + + return_content = json.loads(parsed_response) + knative_result.parse_benchmark_output(return_content) + return knative_result + + def async_invoke(self, payload: dict) -> concurrent.futures.Future: + """ + Asynchronously invoke the function using port forwarding and curl. + + Args: + payload (dict): The payload data to send with the request. + + Returns: + concurrent.futures.Future: A future representing the asynchronous invocation. + """ + pool = concurrent.futures.ThreadPoolExecutor() + fut = pool.submit(self.sync_invoke, payload) + return fut + + def serialize(self) -> dict: + """ + Serialize the trigger configuration. + + Returns: + dict: A dictionary representing the serialized trigger configuration. + """ + return { + "type": "Library", + "name": self.function_name, + "pod_name": self.pod_name, + "namespace": self.namespace, + } + + @staticmethod + def deserialize(obj: dict) -> Trigger: + """ + Deserialize a dictionary into a LibraryTrigger object. + + Args: + obj (dict): The dictionary containing the serialized trigger configuration. + + Returns: + Trigger: A LibraryTrigger object instantiated from the serialized data. + """ + return LibraryTrigger(obj["name"], obj["pod_name"], obj["namespace"]) + + @staticmethod + def typename() -> str: + """ + Return the typename of the trigger (Knative.LibraryTrigger). + + Returns: + str: The typename of the trigger. + """ + return "Knative.LibraryTrigger" + + +class HTTPTrigger(Trigger): + """ + Trigger implementation for invoking a Knative service via HTTP. + + Attributes: + function_name (str): The name of the function to invoke. + url (str): The URL of the Knative service endpoint. + """ + + def __init__(self, function_name: str, url: str): + """ + Initialize the HTTPTrigger with the function name and service URL. + + Args: + function_name (str): The name of the function to invoke. + url (str): The URL of the Knative service endpoint. + """ + super().__init__() + self.function_name = function_name + self.url = url + + @staticmethod + def typename() -> str: + """ + Return the typename of the trigger (Knative.HTTPTrigger). + + Returns: + str: The typename of the trigger. + """ + return "Knative.HTTPTrigger" + + @staticmethod + def trigger_type() -> Trigger.TriggerType: + """ + Return the type of trigger (HTTPTrigger). + + Returns: + Trigger.TriggerType: The trigger type (HTTPTrigger). + """ + return Trigger.TriggerType.HTTP + + def sync_invoke(self, payload: dict) -> ExecutionResult: + """ + Synchronously invoke the function via HTTP POST request. + + Args: + payload (dict): The payload data to send with the request. + + Returns: + ExecutionResult: The result of the function invocation. + """ + self.logging.debug(f"Invoke function {self.url}") + return self._http_invoke(payload, self.url, False) + + def async_invoke(self, payload: dict) -> concurrent.futures.Future: + """ + Asynchronously invoke the function via HTTP POST request. + + Args: + payload (dict): The payload data to send with the request. + + Returns: + concurrent.futures.Future: A future representing the asynchronous invocation. + """ + pool = concurrent.futures.ThreadPoolExecutor() + fut = pool.submit(self.sync_invoke, payload) + return fut + + def _http_invoke(self, payload: dict, url: str, async_invoke: bool) -> ExecutionResult: + """ + Helper method for invoking the function via HTTP POST request. + + Args: + payload (dict): The payload data to send with the request. + url (str): The URL of the Knative service endpoint. + async_invoke (bool): Whether the invocation is asynchronous (not used in this method). + + Returns: + ExecutionResult: The result of the function invocation. + """ + headers = {'Content-Type': 'application/json'} + error = None + try: + begin = datetime.datetime.now() + response = requests.post(url, json=payload, headers=headers) + end = datetime.datetime.now() + response.raise_for_status() + parsed_response = response.json() + except (requests.RequestException, ValueError) as e: + end = datetime.datetime.now() + error = e + + knative_result = ExecutionResult.from_times(begin, end) + if error is not None: + self.logging.error(f"HTTP invocation of {self.function_name} failed!") + knative_result.stats.failure = True + return knative_result + + knative_result.parse_benchmark_output(parsed_response) + return knative_result + + def serialize(self) -> dict: + """ + Serialize the trigger configuration. + + Returns: + dict: A dictionary representing the serialized trigger configuration. + """ + return {"type": "HTTP", "fname": self.function_name, "url": self.url} + + @staticmethod + def deserialize(obj: dict) -> Trigger: + """ + Deserialize a dictionary into an HTTPTrigger object. + + Args: + obj (dict): The dictionary containing the serialized trigger configuration. + + Returns: + Trigger: An HTTPTrigger object instantiated from the serialized data. + """ + return HTTPTrigger(obj["fname"], obj["url"]) diff --git a/sebs/types.py b/sebs/types.py index 2f26117e..aafbac84 100644 --- a/sebs/types.py +++ b/sebs/types.py @@ -7,6 +7,7 @@ class Platforms(str, Enum): GCP = "gcp" LOCAL = "local" OPENWHISK = "openwhisk" + KNATIVE = "knative" class Storage(str, Enum):