diff --git a/sky/__init__.py b/sky/__init__.py index 4e720d63ce0..6000b207f8f 100644 --- a/sky/__init__.py +++ b/sky/__init__.py @@ -135,6 +135,7 @@ def set_proxy_env_var(proxy_var: str, urllib_var: Optional[str]): RunPod = clouds.RunPod Vsphere = clouds.Vsphere Fluidstack = clouds.Fluidstack +Nebius = clouds.Nebius optimize = Optimizer.optimize __all__ = [ @@ -153,6 +154,7 @@ def set_proxy_env_var(proxy_var: str, urllib_var: Optional[str]): 'SCP', 'Vsphere', 'Fluidstack', + 'Nebius', 'Optimizer', 'OptimizeTarget', 'backends', diff --git a/sky/adaptors/nebius.py b/sky/adaptors/nebius.py new file mode 100644 index 00000000000..1e66cdb0362 --- /dev/null +++ b/sky/adaptors/nebius.py @@ -0,0 +1,64 @@ +"""Nebius cloud adaptor.""" +import os + +from sky.adaptors import common + +NB_TENANT_ID_PATH = '~/.nebius/NB_TENANT_ID.txt' +NEBIUS_IAM_TOKEN_PATH = '~/.nebius/NEBIUS_IAM_TOKEN.txt' + +MAX_RETRIES_TO_DISK_CREATE = 120 +MAX_RETRIES_TO_INSTANCE_READY = 120 +MAX_RETRIES_TO_DISK_DELETE = 120 +MAX_RETRIES_TO_INSTANCE_WAIT = 120 # Maximum number of retries + +POLL_INTERVAL = 5 + +nebius = common.LazyImport( + 'nebius', + import_error_message='Failed to import dependencies for Nebius AI Cloud. ' + 'Try running: pip install "skypilot[nebius]"') + + +def request_error(): + return nebius.aio.service_error.RequestError + + +def compute(): + # pylint: disable=import-outside-toplevel + from nebius.api.nebius.compute import v1 as compute_v1 + return compute_v1 + + +def iam(): + # pylint: disable=import-outside-toplevel + from nebius.api.nebius.iam import v1 as iam_v1 + return iam_v1 + + +def nebius_common(): + # pylint: disable=import-outside-toplevel + from nebius.api.nebius.common import v1 as common_v1 + return common_v1 + + +def vpc(): + # pylint: disable=import-outside-toplevel + from nebius.api.nebius.vpc import v1 as vpc_v1 + return vpc_v1 + + +def get_iam_token(): + with open(os.path.expanduser(NEBIUS_IAM_TOKEN_PATH), + encoding='utf-8') as file: + iam_token = file.read().strip() + return iam_token + + +def get_tenant_id(): + with open(os.path.expanduser(NB_TENANT_ID_PATH), encoding='utf-8') as file: + tenant_id = file.read().strip() + return tenant_id + + +def sdk(credentials): + return nebius.sdk.SDK(credentials=credentials) diff --git a/sky/backends/backend_utils.py b/sky/backends/backend_utils.py index bf92f442d2f..762d268075c 100644 --- a/sky/backends/backend_utils.py +++ b/sky/backends/backend_utils.py @@ -1044,6 +1044,7 @@ def _add_auth_to_cluster_config(cloud: clouds.Cloud, cluster_config_file: str): clouds.Paperspace, clouds.Azure, clouds.DO, + clouds.Nebius, )): config = auth.configure_ssh_info(config) elif isinstance(cloud, clouds.GCP): diff --git a/sky/backends/cloud_vm_ray_backend.py b/sky/backends/cloud_vm_ray_backend.py index e2f7c997b09..7bdc1c3e402 100644 --- a/sky/backends/cloud_vm_ray_backend.py +++ b/sky/backends/cloud_vm_ray_backend.py @@ -182,7 +182,8 @@ def _get_cluster_config_template(cloud): clouds.RunPod: 'runpod-ray.yml.j2', clouds.Kubernetes: 'kubernetes-ray.yml.j2', clouds.Vsphere: 'vsphere-ray.yml.j2', - clouds.Fluidstack: 'fluidstack-ray.yml.j2' + clouds.Fluidstack: 'fluidstack-ray.yml.j2', + clouds.Nebius: 'nebius-ray.yml.j2' } return cloud_to_template[type(cloud)] diff --git a/sky/clouds/__init__.py b/sky/clouds/__init__.py index 24b805fe8bc..f3117b18af0 100644 --- a/sky/clouds/__init__.py +++ b/sky/clouds/__init__.py @@ -21,6 +21,7 @@ from sky.clouds.ibm import IBM from sky.clouds.kubernetes import Kubernetes from sky.clouds.lambda_cloud import Lambda +from sky.clouds.nebius import Nebius from sky.clouds.oci import OCI from sky.clouds.paperspace import Paperspace from sky.clouds.runpod import RunPod @@ -49,6 +50,7 @@ 'ProvisionerVersion', 'StatusVersion', 'Fluidstack', + 'Nebius', # Utility functions 'cloud_in_iterable', ] diff --git a/sky/clouds/nebius.py b/sky/clouds/nebius.py new file mode 100644 index 00000000000..885e1a01354 --- /dev/null +++ b/sky/clouds/nebius.py @@ -0,0 +1,301 @@ +""" Nebius Cloud. """ +import logging +import typing +from typing import Dict, Iterator, List, Optional, Tuple, Union + +from sky import clouds +from sky.adaptors import nebius +from sky.clouds import service_catalog +from sky.utils import resources_utils + +if typing.TYPE_CHECKING: + from sky import resources as resources_lib + +_CREDENTIAL_FILES = [ + # credential files for Nebius + nebius.NB_TENANT_ID_PATH, + nebius.NEBIUS_IAM_TOKEN_PATH +] + + +@clouds.CLOUD_REGISTRY.register +class Nebius(clouds.Cloud): + """Nebius GPU Cloud""" + _REPR = 'Nebius' + _CLOUD_UNSUPPORTED_FEATURES = { + clouds.CloudImplementationFeatures.AUTO_TERMINATE: + ('Autodown not supported. Can\'t delete disk.'), + clouds.CloudImplementationFeatures.SPOT_INSTANCE: + ('Spot is not supported, as Nebius API does not implement spot.'), + clouds.CloudImplementationFeatures.CLONE_DISK_FROM_CLUSTER: + (f'Migrating disk is currently not supported on {_REPR}.'), + clouds.CloudImplementationFeatures.DOCKER_IMAGE: + (f'Docker image is currently not supported on {_REPR}. ' + 'You can try running docker command inside the ' + '`run` section in task.yaml.'), + clouds.CloudImplementationFeatures.CUSTOM_DISK_TIER: + (f'Custom disk tier is currently not supported on {_REPR}.'), + clouds.CloudImplementationFeatures.OPEN_PORTS: + (f'Opening ports is currently not supported on {_REPR}.'), + } + # Nebius maximum instance name length defined as <= 63 as a hostname length + # 63 - 8 = 55 characters since + # our provisioner adds additional `-worker`. + _MAX_CLUSTER_NAME_LEN_LIMIT = 55 + _regions: List[clouds.Region] = [] + + # Using the latest SkyPilot provisioner API to provision and check status. + PROVISIONER_VERSION = clouds.ProvisionerVersion.SKYPILOT + STATUS_VERSION = clouds.StatusVersion.SKYPILOT + + @classmethod + def _cloud_unsupported_features( + cls) -> Dict[clouds.CloudImplementationFeatures, str]: + return cls._CLOUD_UNSUPPORTED_FEATURES + + @classmethod + def _unsupported_features_for_resources( + cls, resources: 'resources_lib.Resources' + ) -> Dict[clouds.CloudImplementationFeatures, str]: + del resources # unused + return cls._CLOUD_UNSUPPORTED_FEATURES + + @classmethod + def _max_cluster_name_length(cls) -> Optional[int]: + return cls._MAX_CLUSTER_NAME_LEN_LIMIT + + @classmethod + def regions_with_offering(cls, instance_type: str, + accelerators: Optional[Dict[str, int]], + use_spot: bool, region: Optional[str], + zone: Optional[str]) -> List[clouds.Region]: + assert zone is None, 'Nebius does not support zones.' + del accelerators, zone # unused + if use_spot: + return [] + regions = service_catalog.get_region_zones_for_instance_type( + instance_type, use_spot, 'nebius') + + if region is not None: + regions = [r for r in regions if r.name == region] + return regions + + @classmethod + def get_vcpus_mem_from_instance_type( + cls, + instance_type: str, + ) -> Tuple[Optional[float], Optional[float]]: + logging.debug('Nebius cloud get vcpus mem: %s', cls._REPR) + return service_catalog.get_vcpus_mem_from_instance_type(instance_type, + clouds='nebius') + + @classmethod + def zones_provision_loop( + cls, + *, + region: str, + num_nodes: int, + instance_type: str, + accelerators: Optional[Dict[str, int]] = None, + use_spot: bool = False, + ) -> Iterator[None]: + logging.debug('Nebius cloud zone provision loop: %s', cls._REPR) + del num_nodes # unused + regions = cls.regions_with_offering(instance_type, + accelerators, + use_spot, + region=region, + zone=None) + for r in regions: + assert r.zones is None, r + yield r.zones + + def instance_type_to_hourly_cost(self, + instance_type: str, + use_spot: bool, + region: Optional[str] = None, + zone: Optional[str] = None) -> float: + logging.debug('Nebius cloud instance type to hourly cost: %s',) + return service_catalog.get_hourly_cost(instance_type, + use_spot=use_spot, + region=region, + zone=zone, + clouds='nebius') + + def accelerators_to_hourly_cost(self, + accelerators: Dict[str, int], + use_spot: bool, + region: Optional[str] = None, + zone: Optional[str] = None) -> float: + """Returns the hourly cost of the accelerators, in dollars/hour.""" + logging.debug('Nebius cloud accelerators to hourly cost: 0') + del accelerators, use_spot, region, zone # unused + return 0.0 + + def get_egress_cost(self, num_gigabytes: float) -> float: + logging.debug('Nebius cloud get egress cost: 0',) + return 0.0 + + def __repr__(self): + return 'Nebius' + + def is_same_cloud(self, other: clouds.Cloud) -> bool: + logging.debug('Nebius cloud is same: %s', self) + # Returns true if the two clouds are the same cloud type. + return isinstance(other, Nebius) + + @classmethod + def get_default_instance_type( + cls, + cpus: Optional[str] = None, + memory: Optional[str] = None, + disk_tier: Optional[resources_utils.DiskTier] = None + ) -> Optional[str]: + """Returns the default instance type for Nebius.""" + logging.debug('Nebius cloud default instance type: %s', cls._REPR) + return service_catalog.get_default_instance_type(cpus=cpus, + memory=memory, + disk_tier=disk_tier, + clouds='nebius') + + @classmethod + def get_accelerators_from_instance_type( + cls, + instance_type: str, + ) -> Optional[Dict[str, Union[int, float]]]: + return service_catalog.get_accelerators_from_instance_type( + instance_type, clouds='nebius') + + @classmethod + def get_zone_shell_cmd(cls) -> Optional[str]: + logging.debug('Nebius cloud get zone shell cmd: %s',) + return None + + def make_deploy_resources_variables( + self, + resources: 'resources_lib.Resources', + cluster_name: resources_utils.ClusterName, + region: 'clouds.Region', + zones: Optional[List['clouds.Zone']], + num_nodes: int, + dryrun: bool = False) -> Dict[str, Optional[str]]: + del dryrun, cluster_name + assert zones is None, ('Nebius does not support zones', zones) + + r = resources + acc_dict = self.get_accelerators_from_instance_type(r.instance_type) + custom_resources = resources_utils.make_ray_custom_resources_str( + acc_dict) + platform, _ = resources.instance_type.split('_') + + if platform in ('cpu-d3', 'cpu-e2'): + image_family = 'ubuntu22.04-driverless' + elif platform in ('gpu-h100-sxm', 'gpu-h200-sxm', 'gpu-l40s-a'): + image_family = 'ubuntu22.04-cuda12' + else: + raise RuntimeError(f'Unsupported platform: {platform}') + return { + 'instance_type': resources.instance_type, + 'custom_resources': custom_resources, + 'region': region.name, + 'image_id': image_family, + # Nebius does not support specific zones. + 'zones': None, + } + + def _get_feasible_launchable_resources( + self, resources: 'resources_lib.Resources' + ) -> 'resources_utils.FeasibleResources': + """Returns a list of feasible resources for the given resources.""" + if resources.instance_type is not None: + assert resources.is_launchable(), resources + resources = resources.copy(accelerators=None) + return resources_utils.FeasibleResources([resources], [], None) + + def _make(instance_list): + resource_list = [] + for instance_type in instance_list: + r = resources.copy( + cloud=Nebius(), + instance_type=instance_type, + accelerators=None, + cpus=None, + ) + resource_list.append(r) + return resource_list + + # Currently, handle a filter on accelerators only. + accelerators = resources.accelerators + if accelerators is None: + # Return a default instance type + default_instance_type = Nebius.get_default_instance_type( + cpus=resources.cpus, + memory=resources.memory, + disk_tier=resources.disk_tier) + if default_instance_type is None: + # TODO: Add hints to all return values in this method to help + # users understand why the resources are not launchable. + return resources_utils.FeasibleResources([], [], None) + else: + return resources_utils.FeasibleResources( + _make([default_instance_type]), [], None) + + assert len(accelerators) == 1, resources + acc, acc_count = list(accelerators.items())[0] + (instance_list, fuzzy_candidate_list + ) = service_catalog.get_instance_type_for_accelerator( + acc, + acc_count, + use_spot=resources.use_spot, + cpus=resources.cpus, + region=resources.region, + zone=resources.zone, + clouds='nebius') + if instance_list is None: + return resources_utils.FeasibleResources([], fuzzy_candidate_list, + None) + return resources_utils.FeasibleResources(_make(instance_list), + fuzzy_candidate_list, None) + + @classmethod + def check_credentials(cls) -> Tuple[bool, Optional[str]]: + """ Verify that the user has valid credentials for Nebius. """ + logging.debug('Nebius cloud check credentials') + sdk = nebius.sdk(credentials=nebius.get_iam_token()) + try: + service = nebius.iam().ProjectServiceClient(sdk) + service.list(nebius.iam().ListProjectsRequest( + parent_id=nebius.get_tenant_id())).wait() + except nebius.request_error() as e: + return False, ( + f'{e.status} \n' # First line is indented by 4 spaces + ' Credentials can be set up by running: \n' + f' $ nebius iam get-access-token > {nebius.NEBIUS_IAM_TOKEN_PATH} \n' # pylint: disable=line-too-long + ' Copy your tenat ID from the web console and save it to file \n' # pylint: disable=line-too-long + f' $ echo NB_TENANT_ID > {nebius.NB_TENANT_ID_PATH} \n') + + return True, None + + def get_credential_file_mounts(self) -> Dict[str, str]: + logging.debug('Nebius cloud get credential file mounts') + return { + f'~/.nebius/{filename}': f'~/.nebius/{filename}' + for filename in _CREDENTIAL_FILES + } + + @classmethod + def get_current_user_identity(cls) -> Optional[List[str]]: + logging.debug('Nebius cloud get current user identity') + # NOTE: used for very advanced SkyPilot functionality + # Can implement later if desired + return None + + def instance_type_exists(self, instance_type: str) -> bool: + logging.debug('Nebius cloud instance type exists: %s', instance_type) + return service_catalog.instance_type_exists(instance_type, 'nebius') + + def validate_region_zone(self, region: Optional[str], zone: Optional[str]): + logging.debug('Nebius cloud validate region zone: %s', zone) + return service_catalog.validate_region_zone(region, + zone, + clouds='nebius') diff --git a/sky/clouds/service_catalog/constants.py b/sky/clouds/service_catalog/constants.py index 945152582f6..301e023bb8e 100644 --- a/sky/clouds/service_catalog/constants.py +++ b/sky/clouds/service_catalog/constants.py @@ -4,4 +4,4 @@ CATALOG_DIR = '~/.sky/catalogs' ALL_CLOUDS = ('aws', 'azure', 'gcp', 'ibm', 'lambda', 'scp', 'oci', 'kubernetes', 'runpod', 'vsphere', 'cudo', 'fluidstack', - 'paperspace', 'do') + 'paperspace', 'do', 'nebius') diff --git a/sky/clouds/service_catalog/nebius_catalog.py b/sky/clouds/service_catalog/nebius_catalog.py new file mode 100644 index 00000000000..7d7f0497b55 --- /dev/null +++ b/sky/clouds/service_catalog/nebius_catalog.py @@ -0,0 +1,116 @@ +"""Nebius Catalog. + +This module loads the service catalog file and can be used to query +instance types and pricing information for Nebius. +""" +import typing +from typing import Dict, List, Optional, Tuple, Union + +from sky.clouds.service_catalog import common +from sky.utils import resources_utils +from sky.utils import ux_utils + +if typing.TYPE_CHECKING: + from sky.clouds import cloud + +# Keep it synced with the frequency in +# skypilot-catalog/.github/workflows/update-Nebius-catalog.yml +_PULL_FREQUENCY_HOURS = 7 + +_df = common.read_catalog('nebius/vms.csv') + + +def instance_type_exists(instance_type: str) -> bool: + return common.instance_type_exists_impl(_df, instance_type) + + +def validate_region_zone( + region: Optional[str], + zone: Optional[str]) -> Tuple[Optional[str], Optional[str]]: + if zone is not None: + with ux_utils.print_exception_no_traceback(): + raise ValueError('Nebius does not support zones.') + return common.validate_region_zone_impl('nebius', _df, region, zone) + + +def get_hourly_cost(instance_type: str, + use_spot: bool = False, + region: Optional[str] = None, + zone: Optional[str] = None) -> float: + """Returns the cost, or the cheapest cost among all zones for spot.""" + assert not use_spot, 'Nebius does not support spot.' + if zone is not None: + with ux_utils.print_exception_no_traceback(): + raise ValueError('Nebius does not support zones.') + return common.get_hourly_cost_impl(_df, instance_type, use_spot, region, + zone) + + +def get_vcpus_mem_from_instance_type( + instance_type: str) -> Tuple[Optional[float], Optional[float]]: + return common.get_vcpus_mem_from_instance_type_impl(_df, instance_type) + + +def get_default_instance_type( + cpus: Optional[str] = None, + memory: Optional[str] = None, + disk_tier: Optional[resources_utils.DiskTier] = None) -> Optional[str]: + del disk_tier # unused + return common.get_instance_type_for_cpus_mem_impl(_df, cpus, memory) + + +def get_accelerators_from_instance_type( + instance_type: str) -> Optional[Dict[str, Union[int, float]]]: + return common.get_accelerators_from_instance_type_impl(_df, instance_type) + + +def get_instance_type_for_accelerator( + acc_name: str, + acc_count: int, + cpus: Optional[str] = None, + memory: Optional[str] = None, + use_spot: bool = False, + region: Optional[str] = None, + zone: Optional[str] = None) -> Tuple[Optional[List[str]], List[str]]: + """Filter the instance types based on resource requirements. + + Returns a list of instance types satisfying the required count of + accelerators with sorted prices and a list of candidates with fuzzy search. + """ + if zone is not None: + with ux_utils.print_exception_no_traceback(): + raise ValueError('Nebius does not support zones.') + return common.get_instance_type_for_accelerator_impl(df=_df, + acc_name=acc_name, + acc_count=acc_count, + cpus=cpus, + memory=memory, + use_spot=use_spot, + region=region, + zone=zone) + + +def regions() -> List['cloud.Region']: + return common.get_region_zones(_df, use_spot=False) + + +def get_region_zones_for_instance_type(instance_type: str, + use_spot: bool) -> List['cloud.Region']: + df = _df[_df['InstanceType'] == instance_type] + return common.get_region_zones(df, use_spot) + + +def list_accelerators( + gpus_only: bool, + name_filter: Optional[str], + region_filter: Optional[str], + quantity_filter: Optional[int], + case_sensitive: bool = True, + all_regions: bool = False, + require_price: bool = True) -> Dict[str, List[common.InstanceTypeInfo]]: + """Returns all instance types in Nebius offering GPUs.""" + + del require_price # Unused. + return common.list_accelerators_impl('nebius', _df, gpus_only, name_filter, + region_filter, quantity_filter, + case_sensitive, all_regions) diff --git a/sky/provision/__init__.py b/sky/provision/__init__.py index 02a627b08a3..a5b63acafe8 100644 --- a/sky/provision/__init__.py +++ b/sky/provision/__init__.py @@ -20,6 +20,7 @@ from sky.provision import gcp from sky.provision import kubernetes from sky.provision import lambda_cloud +from sky.provision import nebius from sky.provision import oci from sky.provision import runpod from sky.provision import vsphere diff --git a/sky/provision/nebius/__init__.py b/sky/provision/nebius/__init__.py new file mode 100644 index 00000000000..6b57b00edf0 --- /dev/null +++ b/sky/provision/nebius/__init__.py @@ -0,0 +1,10 @@ +"""Nebius provisioner for SkyPilot.""" + +from sky.provision.nebius.config import bootstrap_instances +from sky.provision.nebius.instance import cleanup_ports +from sky.provision.nebius.instance import get_cluster_info +from sky.provision.nebius.instance import query_instances +from sky.provision.nebius.instance import run_instances +from sky.provision.nebius.instance import stop_instances +from sky.provision.nebius.instance import terminate_instances +from sky.provision.nebius.instance import wait_instances diff --git a/sky/provision/nebius/config.py b/sky/provision/nebius/config.py new file mode 100644 index 00000000000..4607a3bc0bb --- /dev/null +++ b/sky/provision/nebius/config.py @@ -0,0 +1,11 @@ +"""Nebius configuration bootstrapping.""" + +from sky.provision import common + + +def bootstrap_instances( + region: str, cluster_name: str, + config: common.ProvisionConfig) -> common.ProvisionConfig: + """Bootstraps instances for the given cluster.""" + del region, cluster_name # unused + return config diff --git a/sky/provision/nebius/instance.py b/sky/provision/nebius/instance.py new file mode 100644 index 00000000000..e9317ca45e9 --- /dev/null +++ b/sky/provision/nebius/instance.py @@ -0,0 +1,246 @@ +"""Nebius instance provisioning.""" +import time +from time import sleep +from typing import Any, Dict, List, Optional + +from sky import sky_logging +from sky import status_lib +from sky.provision import common +from sky.provision.nebius import utils +from sky.utils import common_utils +from sky.utils import ux_utils + +PENDING_STATUS = ['STARTING', 'DELETING', 'STOPPING'] + +MAX_RETRIES_TO_LAUNCH = 120 # Maximum number of retries + +logger = sky_logging.init_logger(__name__) + + +def _filter_instances(region: str, + cluster_name_on_cloud: str, + status_filters: Optional[List[str]], + head_only: bool = False) -> Dict[str, Any]: + project_id = utils.get_project_by_region(region) + instances = utils.list_instances(project_id) + possible_names = [f'{cluster_name_on_cloud}-head'] + if not head_only: + possible_names.append(f'{cluster_name_on_cloud}-worker') + filtered_instances = {} + for instance_id, instance in instances.items(): + if (status_filters is not None and + instance['status'] not in status_filters): + continue + if instance.get('name') in possible_names: + filtered_instances[instance_id] = instance + return filtered_instances + + +def _get_head_instance_id(instances: Dict[str, Any]) -> Optional[str]: + head_instance_id = None + for inst_id, inst in instances.items(): + if inst['name'].endswith('-head'): + head_instance_id = inst_id + break + return head_instance_id + + +def _wait_until_no_pending(region: str, cluster_name_on_cloud: str) -> None: + retry_count = 0 + while retry_count < MAX_RETRIES_TO_LAUNCH: + instances = _filter_instances(region, cluster_name_on_cloud, + PENDING_STATUS) + if not instances: + break + logger.info(f'Waiting for {len(instances)} instances to be ready ' + f'(Attempt {retry_count + 1}/{MAX_RETRIES_TO_LAUNCH}).') + time.sleep(utils.POLL_INTERVAL) + retry_count += 1 + + if retry_count == MAX_RETRIES_TO_LAUNCH: + raise TimeoutError(f'Exceeded maximum retries ' + f'({MAX_RETRIES_TO_LAUNCH * utils.POLL_INTERVAL}' + f' seconds) while waiting for instances' + f' to be ready.') + + +def run_instances(region: str, cluster_name_on_cloud: str, + config: common.ProvisionConfig) -> common.ProvisionRecord: + """Runs instances for the given cluster.""" + _wait_until_no_pending(region, cluster_name_on_cloud) + running_instances = _filter_instances(region, cluster_name_on_cloud, + ['RUNNING']) + head_instance_id = _get_head_instance_id(running_instances) + to_start_count = config.count - len(running_instances) + if to_start_count < 0: + raise RuntimeError( + f'Cluster {cluster_name_on_cloud} already has ' + f'{len(running_instances)} nodes, but {config.count} are required.') + if to_start_count == 0: + if head_instance_id is None: + raise RuntimeError( + f'Cluster {cluster_name_on_cloud} has no head node.') + logger.info(f'Cluster {cluster_name_on_cloud} already has ' + f'{len(running_instances)} nodes, no need to start more.') + return common.ProvisionRecord(provider_name='nebius', + cluster_name=cluster_name_on_cloud, + region=region, + zone=None, + head_instance_id=head_instance_id, + resumed_instance_ids=[], + created_instance_ids=[]) + + created_instance_ids = [] + resumed_instance_ids = [] + stopped_instances = _filter_instances(region, cluster_name_on_cloud, + ['STOPPED']) + # pylint: disable=consider-using-dict-items + for stopped_instance_id in stopped_instances: + if to_start_count > 0: + try: + utils.start(stopped_instance_id) + resumed_instance_ids.append(stopped_instance_id) + to_start_count -= 1 + if stopped_instances[stopped_instance_id]['name'].endswith( + '-head'): + head_instance_id = stopped_instance_id + except Exception as e: # pylint: disable=broad-except + logger.warning(f'Start instance error: {e}') + raise + sleep(utils.POLL_INTERVAL) # to avoid fake STOPPED status + logger.info(f'Started instance {stopped_instance_id}.') + + for _ in range(to_start_count): + node_type = 'head' if head_instance_id is None else 'worker' + try: + platform, preset = config.node_config['InstanceType'].split('_') + instance_id = utils.launch( + name=f'{cluster_name_on_cloud}-{node_type}', + platform=platform, + preset=preset, + region=region, + image_family=config.node_config['ImageId'], + disk_size=config.node_config['DiskSize'], + user_data=config.node_config['UserData']) + except Exception as e: # pylint: disable=broad-except + logger.warning(f'run_instances error: {e}') + raise + logger.info(f'Launched instance {instance_id}.') + created_instance_ids.append(instance_id) + if head_instance_id is None: + head_instance_id = instance_id + assert head_instance_id is not None, 'head_instance_id should not be None' + return common.ProvisionRecord(provider_name='nebius', + cluster_name=cluster_name_on_cloud, + region=region, + zone=None, + head_instance_id=head_instance_id, + resumed_instance_ids=resumed_instance_ids, + created_instance_ids=created_instance_ids) + + +def wait_instances(region: str, cluster_name_on_cloud: str, + state: Optional[status_lib.ClusterStatus]) -> None: + del state + _wait_until_no_pending(region, cluster_name_on_cloud) + + +def stop_instances( + cluster_name_on_cloud: str, + provider_config: Optional[Dict[str, Any]] = None, + worker_only: bool = False, +) -> None: + if provider_config is not None and provider_config['region']: + exist_instances = _filter_instances(provider_config['region'], + cluster_name_on_cloud, ['RUNNING']) + for instance in exist_instances: + if worker_only and instance.endswith('-head'): + continue + utils.stop(instance) + + +def terminate_instances( + cluster_name_on_cloud: str, + provider_config: Optional[Dict[str, Any]] = None, + worker_only: bool = False, +) -> None: + """See sky/provision/__init__.py""" + if provider_config is not None and provider_config['region']: + instances = _filter_instances(provider_config['region'], + cluster_name_on_cloud, None) + for inst_id, inst in instances.items(): + logger.debug(f'Terminating instance {inst_id}: {inst}') + if worker_only and inst['name'].endswith('-head'): + continue + try: + utils.remove(inst_id) + except Exception as e: # pylint: disable=broad-except + with ux_utils.print_exception_no_traceback(): + raise RuntimeError( + f'Failed to terminate instance {inst_id}: ' + f'{common_utils.format_exception(e, use_bracket=False)}' + ) from e + utils.delete_cluster(cluster_name_on_cloud, provider_config['region']) + + +def get_cluster_info( + region: str, + cluster_name_on_cloud: str, + provider_config: Optional[Dict[str, Any]] = None) -> common.ClusterInfo: + _wait_until_no_pending(region, cluster_name_on_cloud) + running_instances = _filter_instances(region, cluster_name_on_cloud, + ['RUNNING']) + instances: Dict[str, List[common.InstanceInfo]] = {} + head_instance_id = None + for instance_id, instance_info in running_instances.items(): + instances[instance_id] = [ + common.InstanceInfo( + instance_id=instance_id, + internal_ip=instance_info['internal_ip'], + external_ip=instance_info['external_ip'], + tags={}, + ) + ] + if instance_info['name'].endswith('-head'): + head_instance_id = instance_id + + return common.ClusterInfo( + instances=instances, + head_instance_id=head_instance_id, + provider_name='nebius', + provider_config=provider_config, + ) + + +def query_instances( + cluster_name_on_cloud: str, + provider_config: Optional[Dict[str, Any]] = None, + non_terminated_only: bool = True, +) -> Dict[str, Optional[status_lib.ClusterStatus]]: + """See sky/provision/__init__.py""" + assert provider_config is not None, (cluster_name_on_cloud, provider_config) + instances = _filter_instances(provider_config['region'], + cluster_name_on_cloud, None) + + status_map = { + 'STARTING': status_lib.ClusterStatus.INIT, + 'RUNNING': status_lib.ClusterStatus.UP, + 'STOPPED': status_lib.ClusterStatus.STOPPED, + 'STOPPING': status_lib.ClusterStatus.STOPPED, + 'DELETING': status_lib.ClusterStatus.STOPPED, + } + statuses: Dict[str, Optional[status_lib.ClusterStatus]] = {} + for inst_id, inst in instances.items(): + status = status_map[inst['status']] + if non_terminated_only and status is None: + continue + statuses[inst_id] = status + return statuses + + +def cleanup_ports( + cluster_name_on_cloud: str, + ports: List[str], + provider_config: Optional[Dict[str, Any]] = None, +) -> None: + del cluster_name_on_cloud, ports, provider_config # Unused. diff --git a/sky/provision/nebius/utils.py b/sky/provision/nebius/utils.py new file mode 100644 index 00000000000..75af6838e6d --- /dev/null +++ b/sky/provision/nebius/utils.py @@ -0,0 +1,258 @@ +"""RunPod library wrapper for SkyPilot.""" +import time +from typing import Any, Dict + +from sky import sky_logging +from sky.adaptors import nebius +from sky.utils import common_utils + +logger = sky_logging.init_logger(__name__) + +sdk = nebius.sdk(credentials=nebius.get_iam_token()) + +POLL_INTERVAL = 5 + + +def retry(func): + """Decorator to retry a function.""" + + def wrapper(*args, **kwargs): + """Wrapper for retrying a function.""" + cnt = 0 + while True: + try: + return func(*args, **kwargs) + except nebius.nebius.error.QueryError as e: + if cnt >= 3: + raise + logger.warning('Retrying for exception: ' + f'{common_utils.format_exception(e)}.') + time.sleep(POLL_INTERVAL) + + return wrapper + + +def get_project_by_region(region: str) -> str: + service = nebius.iam().ProjectServiceClient(sdk) + projects = service.list(nebius.iam().ListProjectsRequest( + parent_id=nebius.get_tenant_id())).wait() + for project in projects.items: + if region == 'eu-north1' and project.metadata.id[8:11] == 'e00': + return project.metadata.id + if region == 'eu-west1' and project.metadata.id[8:11] == 'e01': + return project.metadata.id + raise Exception(f'No project found for region "{region}".') + + +def get_or_creat_gpu_cluster(name: str, region: str) -> str: + """Creates a GPU cluster. + When creating a GPU cluster, select an InfiniBand fabric for it: + + fabric-2, fabric-3 or fabric-4 for projects in the eu-north1 region. + fabric-5 for projects in the eu-west1 region. + + https://docs.nebius.com/compute/clusters/gpu + """ + project_id = get_project_by_region(region) + service = nebius.compute().GpuClusterServiceClient(sdk) + try: + cluster = service.get_by_name(nebius.nebius_common().GetByNameRequest( + parent_id=project_id, + name=name, + )).wait() + cluster_id = cluster.metadata.id + except nebius.request_error() as no_cluster_found_error: + if region == 'eu-north1': + fabric = 'fabric-4' + elif region == 'eu-west1': + fabric = 'fabric-5' + else: + raise RuntimeError( + f'Unsupported region {region}.') from no_cluster_found_error + cluster = service.create(nebius.compute().CreateGpuClusterRequest( + metadata=nebius.nebius_common().ResourceMetadata( + parent_id=project_id, + name=name, + ), + spec=nebius.compute().GpuClusterSpec( + infiniband_fabric=fabric))).wait() + cluster_id = cluster.resource_id + return cluster_id + + +def delete_cluster(name: str, region: str) -> None: + """Delete a GPU cluster.""" + project_id = get_project_by_region(region) + service = nebius.compute().GpuClusterServiceClient(sdk) + try: + cluster = service.get_by_name(nebius.nebius_common().GetByNameRequest( + parent_id=project_id, + name=name, + )).wait() + cluster_id = cluster.metadata.id + logger.debug(f'Found GPU Cluster : {cluster_id}.') + service.delete( + nebius.compute().DeleteGpuClusterRequest(id=cluster_id)).wait() + logger.debug(f'Deleted GPU Cluster : {cluster_id}.') + except nebius.request_error(): + logger.debug('GPU Cluster does not exist.') + pass + return + + +def list_instances(project_id: str) -> Dict[str, Dict[str, Any]]: + """Lists instances associated with API key.""" + service = nebius.compute().InstanceServiceClient(sdk) + result = service.list( + nebius.compute().ListInstancesRequest(parent_id=project_id)).wait() + + instances = result + + instance_dict: Dict[str, Dict[str, Any]] = {} + for instance in instances.items: + info = {} + info['status'] = instance.status.state.name + info['name'] = instance.metadata.name + if instance.status.network_interfaces: + info['external_ip'] = instance.status.network_interfaces[ + 0].public_ip_address.address.split('/')[0] + info['internal_ip'] = instance.status.network_interfaces[ + 0].ip_address.address.split('/')[0] + instance_dict[instance.metadata.id] = info + + return instance_dict + + +def stop(instance_id: str) -> None: + service = nebius.compute().InstanceServiceClient(sdk) + service.stop(nebius.compute().StopInstanceRequest(id=instance_id)).wait() + + +def start(instance_id: str) -> None: + service = nebius.compute().InstanceServiceClient(sdk) + service.start(nebius.compute().StartInstanceRequest(id=instance_id)).wait() + + +def launch(name: str, platform: str, preset: str, region: str, + image_family: str, disk_size: int, user_data: str) -> str: + logger.debug(f'Launching instance: {name}') + disk_name = 'disk-' + name + cluster_id = None + cluster_name = '-'.join(name.split('-')[:4]) + # 8 GPU virtual machines can be grouped into a GPU cluster. + # The GPU clusters are built with InfiniBand secure high-speed networking. + # https://docs.nebius.com/compute/clusters/gpu + if platform in ('gpu-h100-sxm', 'gpu-h200-sxm'): + if preset == '8gpu-128vcpu-1600gb': + cluster_id = get_or_creat_gpu_cluster(cluster_name, region) + + project_id = get_project_by_region(region) + service = nebius.compute().DiskServiceClient(sdk) + disk = service.create(nebius.compute().CreateDiskRequest( + metadata=nebius.nebius_common().ResourceMetadata( + parent_id=project_id, + name=disk_name, + ), + spec=nebius.compute().DiskSpec( + source_image_family=nebius.compute().SourceImageFamily( + image_family=image_family), + size_gibibytes=disk_size, + type=nebius.compute().DiskSpec.DiskType.NETWORK_SSD, + ))).wait() + disk_id = disk.resource_id + retry_count = 0 + while retry_count < nebius.MAX_RETRIES_TO_DISK_CREATE: + disk = service.get_by_name(nebius.nebius_common().GetByNameRequest( + parent_id=project_id, + name=disk_name, + )).wait() + if disk.status.state.name == 'READY': + break + logger.debug(f'Waiting for disk {disk_name} to be ready.') + time.sleep(POLL_INTERVAL) + retry_count += 1 + + if retry_count == nebius.MAX_RETRIES_TO_DISK_CREATE: + raise TimeoutError( + f'Exceeded maximum retries ' + f'({nebius.MAX_RETRIES_TO_DISK_CREATE * POLL_INTERVAL}' + f' seconds) while waiting for disk {disk_name}' + f' to be ready.') + + service = nebius.vpc().SubnetServiceClient(sdk) + sub_net = service.list(nebius.vpc().ListSubnetsRequest( + parent_id=project_id,)).wait() + + service = nebius.compute().InstanceServiceClient(sdk) + service.create(nebius.compute().CreateInstanceRequest( + metadata=nebius.nebius_common().ResourceMetadata( + parent_id=project_id, + name=name, + ), + spec=nebius.compute().InstanceSpec( + gpu_cluster=nebius.compute().InstanceGpuClusterSpec(id=cluster_id,) + if cluster_id else None, + boot_disk=nebius.compute().AttachedDiskSpec( + attach_mode=nebius.compute( + ).AttachedDiskSpec.AttachMode.READ_WRITE, + existing_disk=nebius.compute().ExistingDisk(id=disk_id)), + cloud_init_user_data=user_data, + resources=nebius.compute().ResourcesSpec(platform=platform, + preset=preset), + network_interfaces=[ + nebius.compute().NetworkInterfaceSpec( + subnet_id=sub_net.items[0].metadata.id, + ip_address=nebius.compute().IPAddress(), + name='network-interface-0', + public_ip_address=nebius.compute().PublicIPAddress()) + ]))).wait() + instance_id = '' + retry_count = 0 + while retry_count < nebius.MAX_RETRIES_TO_INSTANCE_READY: + service = nebius.compute().InstanceServiceClient(sdk) + instance = service.get_by_name(nebius.nebius_common().GetByNameRequest( + parent_id=project_id, + name=name, + )).wait() + if instance.status.state.name == 'STARTING': + instance_id = instance.metadata.id + break + time.sleep(POLL_INTERVAL) + logger.debug(f'Waiting for instance {name} start running.') + retry_count += 1 + + if retry_count == nebius.MAX_RETRIES_TO_INSTANCE_READY: + raise TimeoutError( + f'Exceeded maximum retries ' + f'({nebius.MAX_RETRIES_TO_INSTANCE_READY * POLL_INTERVAL}' + f' seconds) while waiting for instance {name}' + f' to be ready.') + return instance_id + + +def remove(instance_id: str) -> None: + """Terminates the given instance.""" + service = nebius.compute().InstanceServiceClient(sdk) + result = service.get( + nebius.compute().GetInstanceRequest(id=instance_id)).wait() + disk_id = result.spec.boot_disk.existing_disk.id + service.delete( + nebius.compute().DeleteInstanceRequest(id=instance_id)).wait() + retry_count = 0 + while retry_count < nebius.MAX_RETRIES_TO_DISK_DELETE: + try: + service = nebius.compute().DiskServiceClient(sdk) + service.delete( + nebius.compute().DeleteDiskRequest(id=disk_id)).wait() + break + except nebius.request_error(): + logger.debug('Waiting for disk deletion.') + time.sleep(POLL_INTERVAL) + retry_count += 1 + + if retry_count == nebius.MAX_RETRIES_TO_DISK_DELETE: + raise TimeoutError( + f'Exceeded maximum retries ' + f'({nebius.MAX_RETRIES_TO_DISK_DELETE * POLL_INTERVAL}' + f' seconds) while waiting for disk {disk_id}' + f' to be deleted.') diff --git a/sky/setup_files/dependencies.py b/sky/setup_files/dependencies.py index 13b99770e5b..40a05ba042c 100644 --- a/sky/setup_files/dependencies.py +++ b/sky/setup_files/dependencies.py @@ -139,6 +139,7 @@ # docs instead. # 'vsphere-automation-sdk @ git+https://github.com/vmware/vsphere-automation-sdk-python.git@v8.0.1.0' pylint: disable=line-too-long ], + 'nebius': ['nebius @ git+https://github.com/nebius/pysdk@876bb16'] } extras_require['all'] = sum(extras_require.values(), []) diff --git a/sky/templates/nebius-ray.yml.j2 b/sky/templates/nebius-ray.yml.j2 new file mode 100644 index 00000000000..740d0211e4d --- /dev/null +++ b/sky/templates/nebius-ray.yml.j2 @@ -0,0 +1,70 @@ +cluster_name: {{cluster_name_on_cloud}} + +# The maximum number of workers nodes to launch in addition to the head node. +max_workers: {{num_nodes - 1}} +upscaling_speed: {{num_nodes - 1}} +idle_timeout_minutes: 60 + +provider: + type: external + module: sky.provision.nebius + region: "{{region}}" + +auth: + ssh_user: ubuntu + ssh_private_key: {{ssh_private_key}} + +available_node_types: + ray_head_default: + resources: {} + node_config: + InstanceType: {{instance_type}} + ImageId: {{image_id}} + DiskSize: {{disk_size}} + UserData: | + users: + - name: skypilot:ssh_user + shell: /bin/bash + sudo: ALL=(ALL) NOPASSWD:ALL + ssh_authorized_keys: + - |- + skypilot:ssh_public_key_content + +head_node_type: ray_head_default + +# Format: `REMOTE_PATH : LOCAL_PATH` +file_mounts: { + "{{sky_ray_yaml_remote_path}}": "{{sky_ray_yaml_local_path}}", + "{{sky_remote_path}}/{{sky_wheel_hash}}": "{{sky_local_path}}", +{%- for remote_path, local_path in credentials.items() %} + "{{remote_path}}": "{{local_path}}", +{%- endfor %} +} + +rsync_exclude: [] + +initialization_commands: [] + +# List of shell commands to run to set up nodes. +# NOTE: these are very performance-sensitive. Each new item opens/closes an SSH +# connection, which is expensive. Try your best to co-locate commands into fewer +# items! +# +# Increment the following for catching performance bugs easier: +# current num items (num SSH connections): 1 +setup_commands: + # Disable `unattended-upgrades` to prevent apt-get from hanging. It should be called at the beginning before the process started to avoid being blocked. (This is a temporary fix.) + # Create ~/.ssh/config file in case the file does not exist in the image. + # Line 'rm ..': there is another installation of pip. + # Line 'sudo bash ..': set the ulimit as suggested by ray docs for performance. https://docs.ray.io/en/latest/cluster/vms/user-guides/large-cluster-best-practices.html#system-configuration + # Line 'sudo grep ..': set the number of threads per process to unlimited to avoid ray job submit stucking issue when the number of running ray jobs increase. + # Line 'mkdir -p ..': disable host key check + # Line 'python3 -c ..': patch the buggy ray files and enable `-o allow_other` option for `goofys` + - mkdir -p ~/.ssh; touch ~/.ssh/config; + {{ conda_installation_commands }} + source ~/.bashrc; + {{ ray_skypilot_installation_commands }} + sudo bash -c 'rm -rf /etc/security/limits.d; echo "* soft nofile 1048576" >> /etc/security/limits.conf; echo "* hard nofile 1048576" >> /etc/security/limits.conf'; + sudo grep -e '^DefaultTasksMax' /etc/systemd/system.conf || (sudo bash -c 'echo "DefaultTasksMax=infinity" >> /etc/systemd/system.conf'); sudo systemctl set-property user-$(id -u $(whoami)).slice TasksMax=infinity; sudo systemctl daemon-reload; + mkdir -p ~/.ssh; (grep -Pzo -q "Host \*\n StrictHostKeyChecking no" ~/.ssh/config) || printf "Host *\n StrictHostKeyChecking no\n" >> ~/.ssh/config; + [ -f /etc/fuse.conf ] && sudo sed -i 's/#user_allow_other/user_allow_other/g' /etc/fuse.conf || (sudo sh -c 'echo "user_allow_other" > /etc/fuse.conf'); diff --git a/tests/conftest.py b/tests/conftest.py index af6367fdac6..a2192fcd9cc 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -22,7 +22,8 @@ # --managed-jobs. all_clouds_in_smoke_tests = [ 'aws', 'gcp', 'azure', 'lambda', 'cloudflare', 'ibm', 'scp', 'oci', 'do', - 'kubernetes', 'vsphere', 'cudo', 'fluidstack', 'paperspace', 'runpod' + 'kubernetes', 'vsphere', 'cudo', 'fluidstack', 'paperspace', 'runpod', + 'nebius' ] default_clouds_to_run = ['aws', 'azure'] @@ -44,7 +45,8 @@ 'cudo': 'cudo', 'paperspace': 'paperspace', 'do': 'do', - 'runpod': 'runpod' + 'runpod': 'runpod', + 'nebius': 'nebius' } diff --git a/tests/smoke_tests/smoke_tests_utils.py b/tests/smoke_tests/smoke_tests_utils.py index 14f2b94a5d4..f8a8f2f7024 100644 --- a/tests/smoke_tests/smoke_tests_utils.py +++ b/tests/smoke_tests/smoke_tests_utils.py @@ -20,7 +20,7 @@ # To avoid the second smoke test reusing the cluster launched in the first # smoke test. Also required for test_managed_jobs_recovery to make sure the # manual termination with aws ec2 does not accidentally terminate other clusters -# for for the different managed jobs launch with the same job name but a +# for the different managed jobs launch with the same job name but a # different job id. test_id = str(uuid.uuid4())[-2:] diff --git a/tests/smoke_tests/test_basic.py b/tests/smoke_tests/test_basic.py index 30576d3272f..44cb41d74df 100644 --- a/tests/smoke_tests/test_basic.py +++ b/tests/smoke_tests/test_basic.py @@ -119,6 +119,7 @@ def test_launch_fast(generic_cloud: str): @pytest.mark.no_lambda_cloud @pytest.mark.no_ibm @pytest.mark.no_kubernetes +@pytest.mark.no_nebius def test_launch_fast_with_autostop(generic_cloud: str): name = smoke_tests_utils.get_cluster_name() # Azure takes ~ 7m15s (435s) to autostop a VM, so here we use 600 to ensure @@ -423,6 +424,7 @@ def test_load_dump_yaml_config_equivalent(self): @pytest.mark.no_fluidstack # Fluidstack does not support K80 gpus for now @pytest.mark.no_paperspace # Paperspace does not support K80 gpus @pytest.mark.no_do # DO does not support K80s +@pytest.mark.no_nebius # Nebius does not support K80s def test_multiple_accelerators_ordered(): name = smoke_tests_utils.get_cluster_name() test = smoke_tests_utils.Test( @@ -440,6 +442,7 @@ def test_multiple_accelerators_ordered(): @pytest.mark.no_fluidstack # Fluidstack has low availability for T4 GPUs @pytest.mark.no_paperspace # Paperspace does not support T4 GPUs @pytest.mark.no_do # DO does not have multiple accelerators +@pytest.mark.no_nebius # Nebius does not support T4 GPUs def test_multiple_accelerators_ordered_with_default(): name = smoke_tests_utils.get_cluster_name() test = smoke_tests_utils.Test( @@ -457,6 +460,7 @@ def test_multiple_accelerators_ordered_with_default(): @pytest.mark.no_fluidstack # Fluidstack has low availability for T4 GPUs @pytest.mark.no_paperspace # Paperspace does not support T4 GPUs @pytest.mark.no_do # DO does not have multiple accelerators +@pytest.mark.no_nebius # Nebius does not support T4 GPUs def test_multiple_accelerators_unordered(): name = smoke_tests_utils.get_cluster_name() test = smoke_tests_utils.Test( @@ -473,6 +477,7 @@ def test_multiple_accelerators_unordered(): @pytest.mark.no_fluidstack # Fluidstack has low availability for T4 GPUs @pytest.mark.no_paperspace # Paperspace does not support T4 GPUs @pytest.mark.no_do # DO does not support multiple accelerators +@pytest.mark.no_nebius # Nebius does not support T4 GPUs def test_multiple_accelerators_unordered_with_default(): name = smoke_tests_utils.get_cluster_name() test = smoke_tests_utils.Test( diff --git a/tests/smoke_tests/test_cluster_job.py b/tests/smoke_tests/test_cluster_job.py index 1fbb1b3d875..0adbe63e325 100644 --- a/tests/smoke_tests/test_cluster_job.py +++ b/tests/smoke_tests/test_cluster_job.py @@ -44,6 +44,7 @@ @pytest.mark.no_scp # SCP does not have T4 gpus. Run test_scp_job_queue instead @pytest.mark.no_paperspace # Paperspace does not have T4 gpus. @pytest.mark.no_oci # OCI does not have T4 gpus +@pytest.mark.no_nebius # Nebius does not support T4 GPUs @pytest.mark.parametrize('accelerator', [{'do': 'H100'}]) def test_job_queue(generic_cloud: str, accelerator: Dict[str, str]): accelerator = accelerator.get(generic_cloud, 'T4') @@ -80,6 +81,7 @@ def test_job_queue(generic_cloud: str, accelerator: Dict[str, str]): @pytest.mark.no_scp # Doesn't support SCP for now @pytest.mark.no_oci # Doesn't support OCI for now @pytest.mark.no_kubernetes # Doesn't support Kubernetes for now +@pytest.mark.no_nebius # Doesn't support Nebius for now @pytest.mark.parametrize('accelerator', [{'do': 'H100'}]) @pytest.mark.parametrize( 'image_id', @@ -220,6 +222,7 @@ def test_scp_job_queue(): @pytest.mark.no_scp # SCP does not support num_nodes > 1 yet @pytest.mark.no_oci # OCI Cloud does not have T4 gpus. @pytest.mark.no_kubernetes # Kubernetes not support num_nodes > 1 yet +@pytest.mark.no_nebius # Nebius Cloud does not have T4 gpus. @pytest.mark.parametrize('accelerator', [{'do': 'H100'}]) def test_job_queue_multinode(generic_cloud: str, accelerator: Dict[str, str]): accelerator = accelerator.get(generic_cloud, 'T4') @@ -370,6 +373,7 @@ def test_ibm_job_queue_multinode(): @pytest.mark.no_scp # Doesn't support SCP for now @pytest.mark.no_oci # Doesn't support OCI for now @pytest.mark.no_kubernetes # Doesn't support Kubernetes for now +@pytest.mark.no_nebius # Doesn't support Nebius for now # TODO(zhwu): we should fix this for kubernetes def test_docker_preinstalled_package(generic_cloud: str): name = smoke_tests_utils.get_cluster_name() @@ -394,6 +398,7 @@ def test_docker_preinstalled_package(generic_cloud: str): @pytest.mark.no_scp # SCP does not support num_nodes > 1 yet @pytest.mark.no_oci # OCI Cloud does not have T4 gpus @pytest.mark.no_do # DO does not have T4 gpus +@pytest.mark.no_nebius # Nebius does not have T4 gpus def test_multi_echo(generic_cloud: str): name = smoke_tests_utils.get_cluster_name() test = smoke_tests_utils.Test( @@ -436,6 +441,7 @@ def test_multi_echo(generic_cloud: str): @pytest.mark.no_lambda_cloud # Lambda Cloud does not have V100 gpus @pytest.mark.no_ibm # IBM cloud currently doesn't provide public image with CUDA @pytest.mark.no_scp # SCP does not have V100 (16GB) GPUs. Run test_scp_huggingface instead. +@pytest.mark.no_nebius # Nebius does not have T4 gpus for now @pytest.mark.parametrize('accelerator', [{'do': 'H100'}]) def test_huggingface(generic_cloud: str, accelerator: Dict[str, str]): accelerator = accelerator.get(generic_cloud, 'T4') @@ -966,7 +972,8 @@ def test_container_logs_two_simultaneous_jobs_kubernetes(): @pytest.mark.no_lambda_cloud # Lambda Cloud does not have V100 gpus @pytest.mark.no_ibm # IBM cloud currently doesn't provide public image with CUDA @pytest.mark.no_scp # SCP does not support num_nodes > 1 yet -@pytest.mark.no_dos # DO does not have V100 gpus +@pytest.mark.no_do # DO does not have V100 gpus +@pytest.mark.no_nebius # Nebius does not have V100 gpus @pytest.mark.skip( reason= 'The resnet_distributed_tf_app is flaky, due to it failing to detect GPUs.') @@ -1121,6 +1128,7 @@ def test_autostop(generic_cloud: str): # ---------- Testing Autodowning ---------- @pytest.mark.no_fluidstack # FluidStack does not support stopping in SkyPilot implementation @pytest.mark.no_scp # SCP does not support num_nodes > 1 yet. Run test_scp_autodown instead. +@pytest.mark.no_nebius # Nebius does not support stopping in SkyPilot implementation def test_autodown(generic_cloud: str): name = smoke_tests_utils.get_cluster_name() # Azure takes ~ 13m30s (810s) to autodown a VM, so here we use 900 to ensure @@ -1242,6 +1250,7 @@ def test_cancel_azure(): @pytest.mark.no_ibm # IBM cloud currently doesn't provide public image with CUDA @pytest.mark.no_paperspace # Paperspace has `gnome-shell` on nvidia-smi @pytest.mark.no_scp # SCP does not support num_nodes > 1 yet +@pytest.mark.no_nebius # Nebius Cloud does not have V100 gpus @pytest.mark.parametrize('accelerator', [{'do': 'H100'}]) def test_cancel_pytorch(generic_cloud: str, accelerator: Dict[str, str]): accelerator = accelerator.get(generic_cloud, 'T4') @@ -1299,6 +1308,7 @@ def test_cancel_ibm(): @pytest.mark.no_ibm # IBM Cloud does not support spot instances @pytest.mark.no_scp # SCP does not support spot instances @pytest.mark.no_kubernetes # Kubernetes does not have a notion of spot instances +@pytest.mark.no_nebius # Nebius does not support spot instances @pytest.mark.no_do def test_use_spot(generic_cloud: str): """Test use-spot and sky exec.""" diff --git a/tests/test_cli.py b/tests/test_cli.py index 36f2a6ea782..3af71240213 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -7,7 +7,7 @@ import sky.cli as cli CLOUDS_TO_TEST = [ - 'aws', 'gcp', 'ibm', 'azure', 'lambda', 'scp', 'oci', 'vsphere' + 'aws', 'gcp', 'ibm', 'azure', 'lambda', 'scp', 'oci', 'vsphere', 'nebius' ] diff --git a/tests/test_list_accelerators.py b/tests/test_list_accelerators.py index 24011441af2..f4ac3c432ab 100644 --- a/tests/test_list_accelerators.py +++ b/tests/test_list_accelerators.py @@ -1,7 +1,7 @@ import sky CLOUDS_TO_TEST = [ - 'AWS', 'GCP', 'IBM', 'Azure', 'Lambda', 'OCI', 'scp', 'vsphere' + 'AWS', 'GCP', 'IBM', 'Azure', 'Lambda', 'OCI', 'scp', 'vsphere', 'nebius' ]