Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pipeline Editor: Allow for configuration of shared memory size #2942

Merged
merged 42 commits into from
Oct 18, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
63a9334
Allow for configuration of shared memory size
ptitzler Sep 28, 2022
43c2819
add new property to snapshots
ptitzler Sep 28, 2022
0799627
Add validation tests
ptitzler Sep 28, 2022
321f57e
Update handlers test to include new property
ptitzler Sep 28, 2022
71bea8b
Remove extra spaces from snapshots
kiersten-stokes Sep 28, 2022
30b2975
Update UI and replace default with placeholder
Sep 28, 2022
f6aa5b3
Fix alignment for error indicator
Sep 28, 2022
d304d06
Update padding
Sep 28, 2022
715aff1
Fix additional properties for handler test
kiersten-stokes Sep 28, 2022
d2535bc
Change 'size' back to optional now that UI changes are in
kiersten-stokes Sep 28, 2022
b4d35ad
Update snapshots yet again
kiersten-stokes Sep 29, 2022
fb3cb8c
Add support for airflow
ptitzler Sep 29, 2022
8acc2c0
Propagate in case of any falsy size values
kiersten-stokes Sep 29, 2022
60d4878
Fix parm bug
ptitzler Sep 29, 2022
7f600fc
Merge component_parameter.py with main and kiersten-stokes:obj-proper…
kiersten-stokes Oct 7, 2022
b4eca09
Merge component_parameter.py with recent changes in kiersten-stokes:o…
kiersten-stokes Oct 7, 2022
92a47fd
Merge branch 'main' into add-shared-mem-configuration-support
ptitzler Oct 10, 2022
6946c78
Update documentation to reflect that units cannot be specified
ptitzler Oct 10, 2022
2e7b601
Reference new method name and clean up property rendering
ptitzler Oct 10, 2022
eec7a77
FIx linting error
ptitzler Oct 10, 2022
504a00c
Fix tests that failed due to supported units changes
ptitzler Oct 10, 2022
07b9bcc
Fix failing kfp component parser test
ptitzler Oct 10, 2022
dabdef7
Fix linting
ptitzler Oct 10, 2022
40e0ecf
Merge branch 'main' of github.com:ptitzler/elyra into add-shared-mem-…
ptitzler Oct 11, 2022
e585224
Update test resources and revert invalid merge change
ptitzler Oct 11, 2022
0e808a1
Fix failing server tests
ptitzler Oct 11, 2022
296fc3b
Fix cypress test assets
ptitzler Oct 11, 2022
a20865c
Add placeholder for size attribute
kiersten-stokes Oct 11, 2022
f44e45d
Fix display of units
ptitzler Oct 12, 2022
4245889
Re-add missing method and update required property settings
ptitzler Oct 12, 2022
fc2aefb
Update pipeline-editor/pipeline-services dependency
ptitzler Oct 12, 2022
0d5116f
Merge branch 'main' of github.com:ptitzler/elyra into add-shared-mem-…
ptitzler Oct 13, 2022
d9b0b25
Update code sandbox
Oct 13, 2022
8ec2d2b
Update codesandbox
Oct 14, 2022
366d5a8
Update incorrect typehint unrelated to this pull request
akchinSTC Oct 17, 2022
06c5596
Update docs/source/user_guide/pipelines.md
ptitzler Oct 17, 2022
e9f18da
Fix quoting in typehint
akchinSTC Oct 17, 2022
732711d
Fix set definition
ptitzler Oct 17, 2022
fa60dfa
Merge branch 'add-shared-mem-configuration-support' of github.com:pti…
ptitzler Oct 17, 2022
f30075b
Remove unnecessary methods and update type hints
kiersten-stokes Oct 17, 2022
fec6cff
Update pipeline editor package
akchinSTC Oct 18, 2022
b6fa88c
Bring subclass type hints in line with base class
kiersten-stokes Oct 18, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions docs/source/user_guide/pipelines.md
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ Each pipeline node is configured using properties. Default node properties are a
- [Kubernetes tolerations](#kubernetes-tolerations)
- [Kubernetes pod annotations](#kubernetes-pod-annotations)
- [Kubernetes pod labels](#kubernetes-pod-labels)
- [Shared memory size](#shared-memory-size)

**Default properties that apply only to generic nodes**

Expand Down Expand Up @@ -157,6 +158,7 @@ Nodes that are implemented using [generic components](pipeline-components.html#g
- [Kubernetes tolerations](#kubernetes-tolerations)
- [Kubernetes pod annotations](#kubernetes-pod-annotations)
- [Kubernetes pod labels](#kubernetes-pod-labels)
- [Shared memory size](#shared-memory-size)

##### Configuring custom nodes

Expand All @@ -167,6 +169,7 @@ Nodes that are implemented using [custom components](pipeline-components.html#cu
- [Kubernetes pod annotations](#kubernetes-pod-annotations)
- [Kubernetes pod labels](#kubernetes-pod-labels)
- [Disable node caching](#disable-node-caching)
- [Shared memory size](#shared-memory-size)

#### Defining dependencies between nodes

Expand Down Expand Up @@ -290,6 +293,13 @@ The following alphabetically sorted list identifies the node properties that are
- The value is ignored when the pipeline is executed locally.
- A default runtime image can also be set in the pipeline properties tab. If a default image is set, the **Runtime Image** property in the node properties tab will indicate that a pipeline default is set. Individual nodes can override the pipeline default value.
- Example: `TensorFlow 2.0`

##### Shared memory size

Shared memory to be allocated on the pod where the component is executed.
- Format:
- _Memory size_: Custom shared memory size in gigabytes (10<sup>9</sup> bytes). The Kubernetes default is used if set to zero.
- Shared memory size is ignored when the pipeline is executed with the `local` runtime option.

### Running pipelines

Expand Down
34 changes: 31 additions & 3 deletions elyra/pipeline/airflow/processor_airflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
from elyra.metadata.schemaspaces import Runtimes
from elyra.pipeline import pipeline_constants
from elyra.pipeline.component_catalog import ComponentCache
from elyra.pipeline.component_parameter import CustomSharedMemorySize
from elyra.pipeline.component_parameter import ElyraProperty
from elyra.pipeline.component_parameter import ElyraPropertyList
from elyra.pipeline.component_parameter import KubernetesAnnotation
Expand Down Expand Up @@ -107,7 +108,7 @@ def __init__(self, **kwargs):
self.class_import_map[parts[1]] = f"from {parts[0]} import {parts[1]}"
self.log.debug(f"class_package_map = {self.class_import_map}")

def process(self, pipeline: Pipeline) -> None:
def process(self, pipeline: Pipeline) -> "AirflowPipelineProcessorResponse":
"""
Submit the pipeline for execution on Apache Airflow.
"""
Expand Down Expand Up @@ -597,6 +598,12 @@ def render_volumes(self, elyra_parameters: Dict[str, ElyraProperty]) -> str:
for v in elyra_parameters.get(pipeline_constants.MOUNTED_VOLUMES, []):
str_to_render += f"""
Volume(name="{v.pvc_name}", configs={{"persistentVolumeClaim": {{"claimName": "{v.pvc_name}"}}}}),"""
# set custom shared memory size
shm = elyra_parameters.get(pipeline_constants.KUBERNETES_SHARED_MEM_SIZE)
if shm is not None and shm.size:
config = f"""configs={{"emptyDir": {{"medium": "Memory", "sizeLimit": "{shm.size}{shm.units}"}}}}"""
akchinSTC marked this conversation as resolved.
Show resolved Hide resolved
str_to_render += f"""
Volume(name="shm", {config}),"""
return dedent(str_to_render)

def render_mounts(self, elyra_parameters: Dict[str, ElyraProperty]) -> str:
Expand Down Expand Up @@ -713,17 +720,38 @@ def add_kubernetes_toleration(self, instance: KubernetesToleration, execution_ob
}
)

def add_custom_shared_memory_size(self, instance: CustomSharedMemorySize, execution_object: Any, **kwargs) -> None:
"""Add CustomSharedMemorySize instance to the execution object for the given runtime processor"""

if not instance.size:
return

if "volumes" not in execution_object:
execution_object["volumes"] = []
if "volume_mounts" not in execution_object:
execution_object["volume_mounts"] = []
execution_object["volumes"].append(
{
"name": "shm",
"emptyDir": {"medium": "Memory", "sizeLimit": f"{instance.size}{instance.units}"},
}
)
execution_object["volume_mounts"].append(
{"mountPath": "/dev/shm", "name": "shm", "sub_path": None, "read_only": False},
)

@property
def supported_properties(self) -> Set[str]:
"""A list of Elyra-owned properties supported by this runtime processor."""
return [
return {
pipeline_constants.ENV_VARIABLES,
pipeline_constants.KUBERNETES_SECRETS,
pipeline_constants.MOUNTED_VOLUMES,
pipeline_constants.KUBERNETES_POD_ANNOTATIONS,
pipeline_constants.KUBERNETES_POD_LABELS,
pipeline_constants.KUBERNETES_TOLERATIONS,
]
pipeline_constants.KUBERNETES_SHARED_MEM_SIZE,
}


class AirflowPipelineProcessorResponse(RuntimePipelineProcessorResponse):
Expand Down
110 changes: 99 additions & 11 deletions elyra/pipeline/component_parameter.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
from typing import Optional
from typing import Set
from typing import TYPE_CHECKING
from typing import Union

# Prevent a circular reference by importing RuntimePipelineProcessor only during type-checking
if TYPE_CHECKING:
Expand All @@ -37,6 +38,7 @@
from elyra.pipeline.pipeline_constants import KUBERNETES_POD_ANNOTATIONS
from elyra.pipeline.pipeline_constants import KUBERNETES_POD_LABELS
from elyra.pipeline.pipeline_constants import KUBERNETES_SECRETS
from elyra.pipeline.pipeline_constants import KUBERNETES_SHARED_MEM_SIZE
from elyra.pipeline.pipeline_constants import KUBERNETES_TOLERATIONS
from elyra.pipeline.pipeline_constants import MOUNTED_VOLUMES
from elyra.util.kubernetes import is_valid_annotation_key
Expand Down Expand Up @@ -209,7 +211,7 @@ def get_schema(cls) -> Dict[str, Any]:
properties[attr.id]["default"] = attr.default_value
if attr.enum:
properties[attr.id]["enum"] = attr.enum
if attr.placeholder:
if attr.placeholder is not None:
uihints[attr.id] = {"ui:placeholder": attr.placeholder}
if attr.required:
required_list.append(attr.id)
Expand Down Expand Up @@ -281,7 +283,7 @@ def get_single_instance(cls, value: Optional[Any] = None) -> ElyraProperty | Non

@classmethod
def get_schema(cls) -> Dict[str, Any]:
"""Build the JSON schema for an Elyra-owned component property"""
"""Build the JSON schema for this property"""
schema = super().get_schema()
schema["enum"] = ["True", "False"]
schema["uihints"] = {"ui:placeholder": "Use runtime default"}
Expand All @@ -298,6 +300,83 @@ def add_to_execution_object(self, runtime_processor: RuntimePipelineProcessor, e
runtime_processor.add_disable_node_caching(instance=self, execution_object=execution_object, **kwargs)


class CustomSharedMemorySize(ElyraProperty):
"""An ElyraProperty representing shared memory size for a node."""

applies_to_generic = True # custom shared mem size applies to generic components
applies_to_custom = True # custom shared mem size applies to custom components

property_id = KUBERNETES_SHARED_MEM_SIZE
property_display_name = "Shared Memory Size"
property_description = """Configure a custom shared memory size in
gigabytes (10^9 bytes) for the pod that executes a node. A custom
value is assigned if the size property value is a number greater than zero."""
property_attributes = [
PropertyAttribute(
attribute_id="size",
display_name="Memory Size (GB)",
placeholder=0,
input_type="number",
hidden=False,
required=False,
),
PropertyAttribute(
attribute_id="units",
display_name="Units",
input_type="string",
hidden=True,
required=False,
),
kevin-bates marked this conversation as resolved.
Show resolved Hide resolved
]

default_units = "G"

def __init__(self, size: str, units: str, **kwargs):
self.size = size
self.units = units or CustomSharedMemorySize.default_units

@classmethod
def get_schema(cls) -> Dict[str, Any]:
"""Build the JSON schema for an Elyra-owned component property"""
schema = super().get_schema()
schema["properties"]["size"]["minimum"] = 0
return schema

def to_dict(self) -> Dict[str, Any]:
"""Convert instance to a dict with relevant class attributes."""
dict_repr = {attr.id: getattr(self, attr.id, None) for attr in self.property_attributes}
return dict_repr

def get_value_for_display(self) -> Dict[str, Any]:
"""Get a representation of the instance to display in UI error messages."""
return self.to_dict()

def get_all_validation_errors(self) -> List[str]:
"""Validate this instance. If the size attribute is set and not zero it
must be a positive floating point number. The units attribute must be 'G'."""
validation_errors = []
# verify custom size
try:
if self.size:
size = float(self.size)
if size < 0:
raise ValueError()
except ValueError:
validation_errors.append(f"Shared memory size '{self.size}' must be a positive number.")
# verify units
if self.units not in ["G"]:
validation_errors.append(f"Shared memory size units '{self.units}' must be 'G'.")
return validation_errors

def add_to_execution_object(self, runtime_processor: RuntimePipelineProcessor, execution_object: Any, **kwargs):
"""Add CustomSharedMemorySize instance to the execution object for the given runtime processor"""
runtime_processor.add_custom_shared_memory_size(instance=self, execution_object=execution_object, **kwargs)

def should_discard(self) -> bool:
"""Ignore this CustomSharedMemorySize instance if no custom size is specified."""
return not self.size


class ElyraPropertyListItem(ElyraProperty):
"""
An Elyra-owned property that is meant to be a member of an ElyraOwnedPropertyList.
Expand All @@ -323,8 +402,8 @@ def get_key_for_dict_entry(self) -> str:
prop_key += f"{key_part}:" if key_attr != keys[-1] else key_part
return prop_key

def get_value_for_dict_entry(self) -> str:
"""Returns the value to be used when constructing a dict from a list of classes."""
def get_value_for_dict_entry(self) -> Union[str, Dict[str, Any]]:
"""Returns the value to be used when constructing a dict from a list of ElyraPropertyListItem."""
return self.to_dict()

def get_value_for_display(self) -> Dict[str, Any]:
Expand Down Expand Up @@ -377,8 +456,11 @@ def get_schema(cls) -> Dict[str, Any]:
schema["uihints"].update({"canRefresh": True})
return schema

def get_value_for_dict_entry(self) -> str:
"""Returns the value to be used when constructing a dict from a list of classes."""
def get_value_for_dict_entry(self) -> Union[str, Dict[str, Any]]:
"""
Returns the value to be used when constructing a dict from a list of ElyraPropertyListItem.
A EnvironmentVariable dict entry will be of the form {self.env_var: self.value}
"""
return self.value

def should_discard(self) -> bool:
Expand Down Expand Up @@ -593,8 +675,11 @@ def __init__(self, key, value, **kwargs):
self.key = key
self.value = value

def get_value_for_dict_entry(self) -> str:
"""Returns the value to be used when constructing a dict from a list of classes."""
def get_value_for_dict_entry(self) -> Union[str, Dict[str, Any]]:
"""
Returns the value to be used when constructing a dict from a list of ElyraPropertyListItem.
A KubernetesAnnotation dict entry will be of the form {self.key: self.value}
"""
return self.value

def get_all_validation_errors(self) -> List[str]:
Expand Down Expand Up @@ -651,8 +736,11 @@ def __init__(self, key, value, **kwargs):
self.key = key
self.value = value

def get_value_for_dict_entry(self) -> str:
"""Returns the value to be used when constructing a dict from a list of classes."""
def get_value_for_dict_entry(self) -> Union[str, Dict[str, Any]]:
"""
Returns the value to be used when constructing a dict from a list of ElyraPropertyListItem.
A KubernetesLabel dict entry will be of the form {self.key: self.value}
"""
return self.value

def get_all_validation_errors(self) -> List[str]:
Expand Down Expand Up @@ -790,7 +878,7 @@ def to_dict(self: List[ElyraPropertyListItem], use_prop_as_value: bool = False)

prop_value = prop.get_value_for_dict_entry()
if use_prop_as_value:
prop_value = prop # use of the property object itself as the value
prop_value = prop # use the property object itself as the value
prop_dict[prop_key] = prop_value

return prop_dict
Expand Down
22 changes: 20 additions & 2 deletions elyra/pipeline/kfp/processor_kfp.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
from kfp.dsl import PipelineConf
from kfp.aws import use_aws_secret # noqa H306
from kubernetes import client as k8s_client
from kubernetes.client import V1EmptyDirVolumeSource
from kubernetes.client import V1EnvVar
from kubernetes.client import V1EnvVarSource
from kubernetes.client import V1PersistentVolumeClaimVolumeSource
Expand All @@ -51,6 +52,7 @@
from elyra.metadata.schemaspaces import Runtimes
from elyra.pipeline import pipeline_constants
from elyra.pipeline.component_catalog import ComponentCache
from elyra.pipeline.component_parameter import CustomSharedMemorySize
from elyra.pipeline.component_parameter import DisableNodeCaching
from elyra.pipeline.component_parameter import ElyraProperty
from elyra.pipeline.component_parameter import ElyraPropertyList
Expand Down Expand Up @@ -775,6 +777,21 @@ def add_disable_node_caching(self, instance: DisableNodeCaching, execution_objec
if instance.selection:
execution_object.execution_options.caching_strategy.max_cache_staleness = "P0D"

def add_custom_shared_memory_size(self, instance: CustomSharedMemorySize, execution_object: Any, **kwargs) -> None:
"""Add CustomSharedMemorySize info to the execution object for the given runtime processor"""

if not instance.size:
return

volume = V1Volume(
name="shm",
empty_dir=V1EmptyDirVolumeSource(medium="Memory", size_limit=f"{instance.size}{instance.units}"),
)
if volume not in execution_object.volumes:
execution_object.add_volume(volume)

execution_object.container.add_volume_mount(V1VolumeMount(mount_path="/dev/shm", name="shm"))

def add_kubernetes_secret(self, instance: KubernetesSecret, execution_object: Any, **kwargs) -> None:
"""Add KubernetesSecret instance to the execution object for the given runtime processor"""
execution_object.container.add_env_variable(
Expand Down Expand Up @@ -825,15 +842,16 @@ def add_kubernetes_toleration(self, instance: KubernetesToleration, execution_ob
@property
def supported_properties(self) -> Set[str]:
"""A list of Elyra-owned properties supported by this runtime processor."""
return [
return {
pipeline_constants.ENV_VARIABLES,
pipeline_constants.KUBERNETES_SECRETS,
pipeline_constants.MOUNTED_VOLUMES,
pipeline_constants.KUBERNETES_POD_ANNOTATIONS,
pipeline_constants.KUBERNETES_POD_LABELS,
pipeline_constants.KUBERNETES_TOLERATIONS,
pipeline_constants.DISABLE_NODE_CACHING,
]
pipeline_constants.KUBERNETES_SHARED_MEM_SIZE,
}


class KfpPipelineProcessorResponse(RuntimePipelineProcessorResponse):
Expand Down
1 change: 1 addition & 0 deletions elyra/pipeline/pipeline_constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
KUBERNETES_POD_ANNOTATIONS = "kubernetes_pod_annotations"
KUBERNETES_POD_LABELS = "kubernetes_pod_labels"
DISABLE_NODE_CACHING = "disable_node_caching"
KUBERNETES_SHARED_MEM_SIZE = "kubernetes_shared_mem_size"
PIPELINE_META_PROPERTIES = ["name", "description", "runtime"]
# optional static prefix to be used when generating an object name for object storage
COS_OBJECT_PREFIX = "cos_object_prefix"
5 changes: 5 additions & 0 deletions elyra/pipeline/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
from elyra.metadata.manager import MetadataManager
from elyra.pipeline.component import Component
from elyra.pipeline.component_catalog import ComponentCache
from elyra.pipeline.component_parameter import CustomSharedMemorySize

Check notice

Code scanning / CodeQL

Cyclic import

Import of module [elyra.pipeline.component_parameter](1) begins an import cycle.
from elyra.pipeline.component_parameter import DisableNodeCaching
from elyra.pipeline.component_parameter import EnvironmentVariable
from elyra.pipeline.component_parameter import KubernetesAnnotation
Expand Down Expand Up @@ -602,6 +603,10 @@ def add_disable_node_caching(self, instance: DisableNodeCaching, execution_objec
"""Add DisableNodeCaching info to the execution object for the given runtime processor"""
pass

def add_custom_shared_memory_size(self, instance: CustomSharedMemorySize, execution_object: Any, **kwargs) -> None:
"""Add CustomSharedMemorySize info to the execution object for the given runtime processor"""
pass

def add_env_var(self, instance: EnvironmentVariable, execution_object: Any, **kwargs) -> None:
"""Add EnvironmentVariable instance to the execution object for the given runtime processor"""
pass
Expand Down
6 changes: 3 additions & 3 deletions elyra/tests/pipeline/airflow/test_component_parser_airflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -438,10 +438,10 @@ def test_parse_airflow_component_file_no_inputs():
no_input_op = parser.parse(catalog_entry)[0]
properties_json = ComponentCache.to_canvas_properties(no_input_op)

# Properties JSON should only include the four parameters common to every
# Properties JSON should only include the five parameters common to every
# component: ('mounted_volumes', 'kubernetes_pod_annotations', 'kubernetes_pod_labels',
# and 'kubernetes_tolerations')
num_common_params = 4
# 'kubernetes_shared_mem_size', and 'kubernetes_tolerations')
num_common_params = 5
properties_from_json = [
prop
for prop in properties_json["properties"]["component_parameters"]["properties"].keys()
Expand Down
Loading