From ebe9ca1becd62ae173562ff0f81c67056e145736 Mon Sep 17 00:00:00 2001 From: Ross Briden Date: Mon, 9 Oct 2023 23:09:15 -0700 Subject: [PATCH 1/9] start impl --- sdk/python/feast/infra/registry/exceptions.py | 2 + sdk/python/feast/infra/registry/memory.py | 661 ++++++++++++++++++ sdk/python/feast/infra/registry/sql.py | 2 +- 3 files changed, 664 insertions(+), 1 deletion(-) create mode 100644 sdk/python/feast/infra/registry/exceptions.py create mode 100644 sdk/python/feast/infra/registry/memory.py diff --git a/sdk/python/feast/infra/registry/exceptions.py b/sdk/python/feast/infra/registry/exceptions.py new file mode 100644 index 0000000000..ffdf5c54aa --- /dev/null +++ b/sdk/python/feast/infra/registry/exceptions.py @@ -0,0 +1,2 @@ +class RegistryError(Exception): + pass diff --git a/sdk/python/feast/infra/registry/memory.py b/sdk/python/feast/infra/registry/memory.py new file mode 100644 index 0000000000..c89d5eb0c6 --- /dev/null +++ b/sdk/python/feast/infra/registry/memory.py @@ -0,0 +1,661 @@ +import copy + +from typing import Optional, Dict, List, Any, Tuple, Union +from pathlib import Path +from datetime import datetime + +from feast.base_feature_view import BaseFeatureView +from feast.data_source import DataSource +from feast.entity import Entity +from feast.feature_service import FeatureService +from feast.feature_view import FeatureView, BaseFeatureView +from feast.on_demand_feature_view import OnDemandFeatureView +from feast.stream_feature_view import StreamFeatureView +from feast.request_feature_view import RequestFeatureView +from feast.repo_config import RegistryConfig + +from feast.protos.feast.core.Registry_pb2 import Registry as RegistryProto +from feast.infra.registry.base_registry import BaseRegistry +from feast.infra.registry.exceptions import RegistryError +from feast.errors import ( + ConflictingFeatureViewNames, + DataSourceNotFoundException, + EntityNotFoundException, + FeatureServiceNotFoundException, + FeatureViewNotFoundException, + ValidationReferenceNotFound, +) +from feast.saved_dataset import SavedDataset, ValidationReference + + +def project_key(project: str, key: str) -> str: + return f"{project}:{key}" + +def invert_projected_key(projected_key: str) -> Tuple[str, str]: + s = projected_key.split(":") + if len(s) != 2: + raise RegistryError(f"Invalid projected key {projected_key}.") + return s[0], s[1] + + +def list_registry_dict(project: str, registry: Dict[str, Any]) -> List[Any]: + return [ + copy.copy(v) for k, v in registry.items() if invert_projected_key(k)[0] == project + ] + + +class MemoryRegistry(BaseRegistry): + def __init__( + self, + x: Optional[RegistryConfig], + y: Optional[Path], + z: bool = False + ) -> None: + self.entities: Dict[str, Entity] = {} + + self.data_sources: Dict[str, DataSource] = {} + self.feature_services: Dict[str, FeatureService] = {} + + self.stream_feature_views: Dict[str, StreamFeatureView] = {} + self.feature_views: Dict[str, FeatureView] = {} + self.on_demand_feature_views: Dict[str, OnDemandFeatureView] = {} + self.request_feature_views: Dict[str, RequestFeatureView] = {} + + self.saved_datasets: Dict[str, SavedDataset] = {} + + self.fv_registries = [ + self.stream_feature_views, + self.feature_views, + self.on_demand_feature_views, + self.request_feature_views + ] + + # TODO: call `feast apply` here? may cause infinite loop + + def _fv_registry(self, feature_view: BaseFeatureView) -> Dict[str, BaseFeatureView]: + if isinstance(feature_view, StreamFeatureView): + return self.stream_feature_views + if isinstance(feature_view, FeatureView): + return self.feature_views + if isinstance(feature_view, OnDemandFeatureView): + return self.on_demand_feature_views + if isinstance(feature_view, RequestFeatureView): + return self.request_feature_views + raise RegistryError(f"Unknown feature view type {feature_view}") + + def enter_apply_context(self): + pass + + def exit_apply_context(self): + pass + + def apply_entity(self, entity: Entity, project: str, commit: bool = True): + """ + Registers a single entity with Feast + + Args: + entity: Entity that will be registered + project: Feast project that this entity belongs to + commit: Whether the change should be persisted immediately + """ + entity.is_valid() + now = datetime.utcnow() + if not entity.created_timestamp: + entity.created_timestamp = now + entity.last_updated_timestamp = now + + key = project_key(project, entity.name) + if key in self.entities: + raise RegistryError(f"Duplicate entity {entity.name} for project {project}.") + self.entities[key] = entity + + def delete_entity(self, name: str, project: str, commit: bool = True): + """ + Deletes an entity or raises an exception if not found. + + Args: + name: Name of entity + project: Feast project that this entity belongs to + commit: Whether the change should be persisted immediately + """ + key = project_key(project, name) + if key not in self.entities: + raise RegistryError(f"Cannot delete unknown entity {name} for project {project}.") + del self.entities[key] + + def get_entity(self, name: str, project: str, allow_cache: bool = False) -> Entity: + """ + Retrieves an entity. + + Args: + name: Name of entity + project: Feast project that this entity belongs to + allow_cache: Whether to allow returning this entity from a cached registry + + Returns: + Returns either the specified entity, or raises an exception if + none is found + """ + key = project_key(project, name) + if key not in self.entities: + raise RegistryError(f"Cannot retrieve unknown entity {name} for project {project}.") + return copy.copy(self.entities[key]) + + def list_entities(self, project: str, allow_cache: bool = False) -> List[Entity]: + """ + Retrieve a list of entities from the registry + + Args: + allow_cache: Whether to allow returning entities from a cached registry + project: Filter entities based on project name + + Returns: + List of entities + """ + return list_registry_dict(project, self.entities) + + # Data source operations + def apply_data_source( + self, data_source: DataSource, project: str, commit: bool = True + ): + """ + Registers a single data source with Feast + + Args: + data_source: A data source that will be registered + project: Feast project that this data source belongs to + commit: Whether to immediately commit to the registry + """ + key = project_key(project, data_source.name) + if key in self.data_sources: + raise RegistryError(f"Duplicate data source {data_source.name} for project {project}.") + self.data_sources[key] = data_source + + def delete_data_source(self, name: str, project: str, commit: bool = True): + """ + Deletes a data source or raises an exception if not found. + + Args: + name: Name of data source + project: Feast project that this data source belongs to + commit: Whether the change should be persisted immediately + """ + key = project_key(project, name) + if key not in self.data_sources: + raise RegistryError(f"Cannot delete unknown data source {name} for project {project}.") + del self.data_sources[key] + + def get_data_source( + self, name: str, project: str, allow_cache: bool = False + ) -> DataSource: + """ + Retrieves a data source. + + Args: + name: Name of data source + project: Feast project that this data source belongs to + allow_cache: Whether to allow returning this data source from a cached registry + + Returns: + Returns either the specified data source, or raises an exception if none is found + """ + key = project_key(project, name) + if key not in self.data_sources: + raise RegistryError(f"Cannot retrieve unknown data source {name} for project {project}.") + return copy.copy(self.data_sources[key]) + + def list_data_sources( + self, project: str, allow_cache: bool = False + ) -> List[DataSource]: + """ + Retrieve a list of data sources from the registry + + Args: + project: Filter data source based on project name + allow_cache: Whether to allow returning data sources from a cached registry + + Returns: + List of data sources + """ + return list_registry_dict(project, self.data_sources) + + # Feature service operations + def apply_feature_service( + self, feature_service: FeatureService, project: str, commit: bool = True + ): + """ + Registers a single feature service with Feast + + Args: + feature_service: A feature service that will be registered + project: Feast project that this entity belongs to + """ + key = project_key(project, feature_service.name) + if key in self.feature_services: + raise RegistryError(f"Duplicate feature service {feature_service.name} for project {project}.") + self.feature_services[key] = feature_service + + def delete_feature_service(self, name: str, project: str, commit: bool = True): + """ + Deletes a feature service or raises an exception if not found. + + Args: + name: Name of feature service + project: Feast project that this feature service belongs to + commit: Whether the change should be persisted immediately + """ + key = project_key(project, name) + if key not in self.feature_services: + raise RegistryError(f"Cannot delete unknown feature service {name} for project {project}.") + del self.feature_services[key] + + def get_feature_service( + self, name: str, project: str, allow_cache: bool = False + ) -> FeatureService: + """ + Retrieves a feature service. + + Args: + name: Name of feature service + project: Feast project that this feature service belongs to + allow_cache: Whether to allow returning this feature service from a cached registry + + Returns: + Returns either the specified feature service, or raises an exception if + none is found + """ + key = project_key(project, name) + if key not in self.feature_services: + raise RegistryError(f"Cannot retrieve unknown feature service {name} for project {project}.") + return copy.copy(self.feature_services[key]) + + def list_feature_services( + self, project: str, allow_cache: bool = False + ) -> List[FeatureService]: + """ + Retrieve a list of feature services from the registry + + Args: + allow_cache: Whether to allow returning entities from a cached registry + project: Filter entities based on project name + + Returns: + List of feature services + """ + return list_registry_dict(project, self.feature_services) + + def apply_feature_view( + self, feature_view: BaseFeatureView, project: str, commit: bool = True + ): + """ + Registers a single feature view with Feast + + Args: + feature_view: Feature view that will be registered + project: Feast project that this feature view belongs to + commit: Whether the change should be persisted immediately + """ + registry = self._fv_registry(feature_view) + key = project_key(project, feature_view.name) + if key in registry: + raise RegistryError(f"Duplicate feature view {feature_view.name} for project {project}.") + registry[key] = feature_view + + def delete_feature_view(self, name: str, project: str, commit: bool = True): + """ + Deletes a feature view or raises an exception if not found. + + Args: + name: Name of feature view + project: Feast project that this feature view belongs to + commit: Whether the change should be persisted immediately + """ + key = project_key(project, name) + for registry in self.fv_registries: + if key in registry: + del registry[key] + return + raise RegistryError(f"Cannot delete unknown feature view {name} for project {project}.") + + def get_stream_feature_view( + self, name: str, project: str, allow_cache: bool = False + ): + """ + Retrieves a stream feature view. + + Args: + name: Name of stream feature view + project: Feast project that this feature view belongs to + allow_cache: Allow returning feature view from the cached registry + + Returns: + Returns either the specified feature view, or raises an exception if + none is found + """ + key = project_key(project, name) + if key not in self.stream_feature_views: + raise RegistryError(f"Cannot retrieve unknown stream feature view {name} for project {project}") + return copy.copy(self.stream_feature_views[key]) + + def list_stream_feature_views( + self, project: str, allow_cache: bool = False, ignore_udfs: bool = False, + ) -> List[StreamFeatureView]: + """ + Retrieve a list of stream feature views from the registry + + Args: + project: Filter stream feature views based on project name + allow_cache: Whether to allow returning stream feature views from a cached registry + ignore_udfs: Whether a feast apply operation is being executed. Determines whether environment + sensitive commands, such as dill.loads(), are skipped and 'None' is set as their results. + Returns: + List of stream feature views + """ + return list_registry_dict(project, self.stream_feature_views) + + def get_on_demand_feature_view( + self, name: str, project: str, allow_cache: bool = False + ) -> OnDemandFeatureView: + """ + Retrieves an on demand feature view. + + Args: + name: Name of on demand feature view + project: Feast project that this on demand feature view belongs to + allow_cache: Whether to allow returning this on demand feature view from a cached registry + + Returns: + Returns either the specified on demand feature view, or raises an exception if + none is found + """ + key = project_key(project, name) + if key not in self.on_demand_feature_views: + raise RegistryError(f"Cannot retrieve unknown on demand feature view {name} for project {project}") + return copy.copy(self.on_demand_feature_views[key]) + + def list_on_demand_feature_views( + self, project: str, allow_cache: bool = False, ignore_udfs: bool = False + ) -> List[OnDemandFeatureView]: + """ + Retrieve a list of on demand feature views from the registry + + Args: + project: Filter on demand feature views based on project name + allow_cache: Whether to allow returning on demand feature views from a cached registry + ignore_udfs: Whether a feast apply operation is being executed. Determines whether environment + sensitive commands, such as dill.loads(), are skipped and 'None' is set as their results. + Returns: + List of on demand feature views + """ + return list_registry_dict(project, self.on_demand_feature_views) + + def get_feature_view( + self, name: str, project: str, allow_cache: bool = False + ) -> FeatureView: + """ + Retrieves a feature view. + + Args: + name: Name of feature view + project: Feast project that this feature view belongs to + allow_cache: Allow returning feature view from the cached registry + + Returns: + Returns either the specified feature view, or raises an exception if + none is found + """ + key = project_key(project, name) + if key not in self.feature_views: + raise RegistryError(f"Cannot retrieve unknown feature view {name} for project {project}") + return copy.copy(self.feature_views[key]) + + def list_feature_views( + self, project: str, allow_cache: bool = False + ) -> List[FeatureView]: + """ + Retrieve a list of feature views from the registry + + Args: + allow_cache: Allow returning feature views from the cached registry + project: Filter feature views based on project name + + Returns: + List of feature views + """ + return list_registry_dict(project, self.feature_views) + + def get_request_feature_view(self, name: str, project: str) -> RequestFeatureView: + """ + Retrieves a request feature view. + + Args: + name: Name of request feature view + project: Feast project that this feature view belongs to + allow_cache: Allow returning feature view from the cached registry + + Returns: + Returns either the specified feature view, or raises an exception if + none is found + """ + key = project_key(project, name) + if key not in self.request_feature_views: + raise RegistryError(f"Cannot retrieve unknown request feature view {name} for project {project}") + return copy.copy(self.request_feature_views[key]) + + def list_request_feature_views( + self, project: str, allow_cache: bool = False + ) -> List[RequestFeatureView]: + """ + Retrieve a list of request feature views from the registry + + Args: + allow_cache: Allow returning feature views from the cached registry + project: Filter feature views based on project name + + Returns: + List of request feature views + """ + return list_registry_dict(project, self.request_feature_views) + + def apply_materialization( + self, + feature_view: FeatureView, + project: str, + start_date: datetime, + end_date: datetime, + commit: bool = True, + ): + key = project_key(project, feature_view.name) + for registry in [self.feature_views, self.stream_feature_views]: + if key in registry: + fv = registry[key] + fv.materialization_intervals.append((start_date, end_date)) + fv.last_updated_timestamp = datetime.utcnow() + return + raise FeatureViewNotFoundException(feature_view.name, project) + + def apply_saved_dataset( + self, + saved_dataset: SavedDataset, + project: str, + commit: bool = True, + ): + """ + Stores a saved dataset metadata with Feast + + Args: + saved_dataset: SavedDataset that will be added / updated to registry + project: Feast project that this dataset belongs to + commit: Whether the change should be persisted immediately + """ + key = project_key(project, saved_dataset.name) + if key in self.saved_datasets: + raise RegistryError(f"Duplicate saved dataset {saved_dataset.name} for project {project}.") + self.saved_datasets[key] = saved_dataset + + def get_saved_dataset( + self, name: str, project: str, allow_cache: bool = False + ) -> SavedDataset: + """ + Retrieves a saved dataset. + + Args: + name: Name of dataset + project: Feast project that this dataset belongs to + allow_cache: Whether to allow returning this dataset from a cached registry + + Returns: + Returns either the specified SavedDataset, or raises an exception if + none is found + """ + pass + + def delete_saved_dataset(self, name: str, project: str, allow_cache: bool = False): + """ + Delete a saved dataset. + + Args: + name: Name of dataset + project: Feast project that this dataset belongs to + allow_cache: Whether to allow returning this dataset from a cached registry + + Returns: + Returns either the specified SavedDataset, or raises an exception if + none is found + """ + + def list_saved_datasets( + self, project: str, allow_cache: bool = False + ) -> List[SavedDataset]: + """ + Retrieves a list of all saved datasets in specified project + + Args: + project: Feast project + allow_cache: Whether to allow returning this dataset from a cached registry + + Returns: + Returns the list of SavedDatasets + """ + pass + + def apply_validation_reference( + self, + validation_reference: ValidationReference, + project: str, + commit: bool = True, + ): + """ + Persist a validation reference + + Args: + validation_reference: ValidationReference that will be added / updated to registry + project: Feast project that this dataset belongs to + commit: Whether the change should be persisted immediately + """ + pass + + def delete_validation_reference(self, name: str, project: str, commit: bool = True): + """ + Deletes a validation reference or raises an exception if not found. + + Args: + name: Name of validation reference + project: Feast project that this object belongs to + commit: Whether the change should be persisted immediately + """ + pass + + def get_validation_reference( + self, name: str, project: str, allow_cache: bool = False + ) -> ValidationReference: + """ + Retrieves a validation reference. + + Args: + name: Name of dataset + project: Feast project that this dataset belongs to + allow_cache: Whether to allow returning this dataset from a cached registry + + Returns: + Returns either the specified ValidationReference, or raises an exception if + none is found + """ + pass + + def list_validation_references( + self, project: str, allow_cache: bool = False + ) -> List[ValidationReference]: + """ + Retrieve a list of validation references from the registry + + Args: + allow_cache: Allow returning feature views from the cached registry + project: Filter feature views based on project name + + Returns: + List of request feature views + """ + pass + + def list_project_metadata( + self, project: str, allow_cache: bool = False + ) -> List[ProjectMetadata]: + """ + Retrieves project metadata + + Args: + project: Filter metadata based on project name + allow_cache: Allow returning feature views from the cached registry + + Returns: + List of project metadata + """ + pass + + def update_infra(self, infra: Infra, project: str, commit: bool = True): + """ + Updates the stored Infra object. + + Args: + infra: The new Infra object to be stored. + project: Feast project that the Infra object refers to + commit: Whether the change should be persisted immediately + """ + pass + + def get_infra(self, project: str, allow_cache: bool = False) -> Infra: + """ + Retrieves the stored Infra object. + + Args: + project: Feast project that the Infra object refers to + allow_cache: Whether to allow returning this entity from a cached registry + + Returns: + The stored Infra object. + """ + pass + + def apply_user_metadata( + self, + project: str, + feature_view: BaseFeatureView, + metadata_bytes: Optional[bytes], + ): + pass + + def get_user_metadata( + self, project: str, feature_view: BaseFeatureView + ) -> Optional[bytes]: + pass + + def proto(self) -> RegistryProto: + pass + + def commit(self): + pass + + def refresh(self, project: Optional[str] = None): + """Refreshes the state of the registry cache by fetching the registry state from the remote registry store.""" + pass diff --git a/sdk/python/feast/infra/registry/sql.py b/sdk/python/feast/infra/registry/sql.py index 57016e1d5e..ede3a3b5e0 100644 --- a/sdk/python/feast/infra/registry/sql.py +++ b/sdk/python/feast/infra/registry/sql.py @@ -13,7 +13,7 @@ String, Table, create_engine, - delete, + delete,'' insert, select, update, From 5bfab4d3232387425a970b92d48fbc523a1c466e Mon Sep 17 00:00:00 2001 From: Ross Briden Date: Mon, 9 Oct 2023 23:13:26 -0700 Subject: [PATCH 2/9] undo error implementation ckpt 1 better type checking plus some bug fixes more impl --- sdk/python/feast/errors.py | 30 ++ sdk/python/feast/feature_store.py | 17 +- sdk/python/feast/infra/registry/exceptions.py | 2 - sdk/python/feast/infra/registry/memory.py | 386 ++++++++++++------ sdk/python/feast/infra/registry/sql.py | 2 +- sdk/python/feast/repo_operations.py | 18 +- 6 files changed, 309 insertions(+), 146 deletions(-) delete mode 100644 sdk/python/feast/infra/registry/exceptions.py diff --git a/sdk/python/feast/errors.py b/sdk/python/feast/errors.py index 042a3622a9..66e1e4dd1b 100644 --- a/sdk/python/feast/errors.py +++ b/sdk/python/feast/errors.py @@ -5,6 +5,7 @@ from feast.field import Field + class DataSourceNotFoundException(Exception): def __init__(self, path): super().__init__( @@ -118,6 +119,13 @@ def __init__(self, name: str, project: str): ) +class DuplicateValidationReference(Exception): + def __init__(self, name: str, project) -> None: + super(DuplicateValidationReference, self).__init__( + f"Duplication validation reference {name} for project {project}." + ) + + class FeastProviderLoginError(Exception): """Error class that indicates a user has not authenticated with their provider.""" @@ -411,3 +419,25 @@ def __init__(self): class PushSourceNotFoundException(Exception): def __init__(self, push_source_name: str): super().__init__(f"Unable to find push source '{push_source_name}'.") + + +class RegistryNotBuiltException(Exception): + def __init__(self, registry_name: str) -> None: + super(RegistryNotBuiltException, self).__init__(f"Registry {registry_name} must be built before being queried.") + + +class EntityNameCollisionException(Exception): + def __init__(self, entity_name: str, project: str) -> None: + super(EntityNameCollisionException, self).__init__(f"Duplicate entity {entity_name} for project {project}.") + + +class FeatureServiceNameCollisionException(Exception): + def __init__(self, service_name: str, project: str) -> None: + super(FeatureServiceNameCollisionException, self).__init__( + f"Duplicate feature service {service_name} for project {project}." + ) + + +class MissingInfraObjectException(Exception): + def __init__(self, project: str) -> None: + super(MissingInfraObjectException, self).__init__(f"No infra objects found for project {project}.") diff --git a/sdk/python/feast/feature_store.py b/sdk/python/feast/feature_store.py index 1596a3686c..827bc7e7aa 100644 --- a/sdk/python/feast/feature_store.py +++ b/sdk/python/feast/feature_store.py @@ -81,6 +81,7 @@ from feast.infra.registry.base_registry import BaseRegistry from feast.infra.registry.registry import Registry from feast.infra.registry.sql import SqlRegistry +from feast.infra.registry.memory import MemoryRegistry from feast.on_demand_feature_view import OnDemandFeatureView from feast.online_response import OnlineResponse from feast.protos.feast.serving.ServingService_pb2 import ( @@ -167,16 +168,24 @@ def __init__( self.repo_path, utils.get_default_yaml_file_path(self.repo_path) ) + self._provider = get_provider(self.config) + + # RB: ordering matters here because `apply_total` assumes a constructed `FeatureStore` instance registry_config = self.config.get_registry_config() if registry_config.registry_type == "sql": self._registry = SqlRegistry(registry_config, None, is_feast_apply=is_feast_apply) + elif registry_config.registry_type == "memory": + from feast.repo_operations import apply_total + self._registry = MemoryRegistry(registry_config, repo_path, is_feast_apply=is_feast_apply) + + # RB: MemoryRegistry is stateless, meaning we'll need to call `apply` with each new FeatureStore instance + if not is_feast_apply: + apply_total(repo_config=self.config, repo_path=self.repo_path, skip_source_validation=False, store=self) else: r = Registry(registry_config, repo_path=self.repo_path) r._initialize_registry(self.config.project) self._registry = r - self._provider = get_provider(self.config) - @log_exceptions def version(self) -> str: """Returns the version of the current Feast SDK/CLI.""" @@ -212,6 +221,10 @@ def refresh_registry(self): downloaded synchronously, which may increase latencies if the triggering method is get_online_features(). """ registry_config = self.config.get_registry_config() + + # RB: MemoryRegistry is a cache + if registry_config.registry_type == "memory": + return registry = Registry(registry_config, repo_path=self.repo_path) registry.refresh(self.config.project) diff --git a/sdk/python/feast/infra/registry/exceptions.py b/sdk/python/feast/infra/registry/exceptions.py deleted file mode 100644 index ffdf5c54aa..0000000000 --- a/sdk/python/feast/infra/registry/exceptions.py +++ /dev/null @@ -1,2 +0,0 @@ -class RegistryError(Exception): - pass diff --git a/sdk/python/feast/infra/registry/memory.py b/sdk/python/feast/infra/registry/memory.py index c89d5eb0c6..8d7737de96 100644 --- a/sdk/python/feast/infra/registry/memory.py +++ b/sdk/python/feast/infra/registry/memory.py @@ -1,10 +1,12 @@ import copy - +import uuid from typing import Optional, Dict, List, Any, Tuple, Union from pathlib import Path from datetime import datetime -from feast.base_feature_view import BaseFeatureView +from feast import usage + +# Feast Resources from feast.data_source import DataSource from feast.entity import Entity from feast.feature_service import FeatureService @@ -12,33 +14,62 @@ from feast.on_demand_feature_view import OnDemandFeatureView from feast.stream_feature_view import StreamFeatureView from feast.request_feature_view import RequestFeatureView +from feast.project_metadata import ProjectMetadata +from feast.infra.infra_object import Infra +from feast.saved_dataset import SavedDataset, ValidationReference + from feast.repo_config import RegistryConfig from feast.protos.feast.core.Registry_pb2 import Registry as RegistryProto from feast.infra.registry.base_registry import BaseRegistry -from feast.infra.registry.exceptions import RegistryError + from feast.errors import ( ConflictingFeatureViewNames, - DataSourceNotFoundException, - EntityNotFoundException, FeatureServiceNotFoundException, + FeatureServiceNameCollisionException, FeatureViewNotFoundException, ValidationReferenceNotFound, + RegistryNotBuiltException, + EntityNameCollisionException, + EntityNotFoundException, + DataSourceRepeatNamesException, + DataSourceObjectNotFoundException, + SavedDatasetNotFound, + DuplicateValidationReference, + MissingInfraObjectException ) -from feast.saved_dataset import SavedDataset, ValidationReference + +TimeDependentObject = Union[ + BaseFeatureView, + FeatureView, + StreamFeatureView, + OnDemandFeatureView, + RequestFeatureView, + Entity, + FeatureService, + SavedDataset +] + +FeastResource = Union[ + TimeDependentObject, DataSource, SavedDataset, ValidationReference, ProjectMetadata, Infra +] def project_key(project: str, key: str) -> str: + # maps (project, key) pair to a single string, called a `projected key` return f"{project}:{key}" + def invert_projected_key(projected_key: str) -> Tuple[str, str]: + # inverse of `project_key` s = projected_key.split(":") if len(s) != 2: - raise RegistryError(f"Invalid projected key {projected_key}.") + raise ValueError(f"Invalid projected key {projected_key}. Key must follow the format `project:name`.") return s[0], s[1] def list_registry_dict(project: str, registry: Dict[str, Any]) -> List[Any]: + # returns a shallow copy of all values in registry that belong to `project` return [ copy.copy(v) for k, v in registry.items() if invert_projected_key(k)[0] == project ] @@ -46,33 +77,44 @@ def list_registry_dict(project: str, registry: Dict[str, Any]) -> List[Any]: class MemoryRegistry(BaseRegistry): def __init__( - self, - x: Optional[RegistryConfig], - y: Optional[Path], - z: bool = False + self, + repo_config: Optional[RegistryConfig], + repo_path: Optional[Path], + is_feast_apply: bool = False ) -> None: + + # unused + self.repo_config = repo_config + self.repo_path = repo_path + + # flag signaling that the registry has been populated; this should be set after a Feast apply operation + self.is_built = False + self.is_feast_apply = is_feast_apply + + self.infra: Dict[str, Infra] = {} + self.entities: Dict[str, Entity] = {} + self.feature_services: Dict[str, FeatureService] = {} + self.project_metadata: Dict[str, ProjectMetadata] = {} + self.validation_references: Dict[str, ValidationReference] = {} self.data_sources: Dict[str, DataSource] = {} - self.feature_services: Dict[str, FeatureService] = {} + self.saved_datasets: Dict[str, SavedDataset] = {} self.stream_feature_views: Dict[str, StreamFeatureView] = {} self.feature_views: Dict[str, FeatureView] = {} self.on_demand_feature_views: Dict[str, OnDemandFeatureView] = {} self.request_feature_views: Dict[str, RequestFeatureView] = {} - self.saved_datasets: Dict[str, SavedDataset] = {} - - self.fv_registries = [ + self.feature_view_registries = [ self.stream_feature_views, self.feature_views, self.on_demand_feature_views, self.request_feature_views ] - # TODO: call `feast apply` here? may cause infinite loop - - def _fv_registry(self, feature_view: BaseFeatureView) -> Dict[str, BaseFeatureView]: + def _get_feature_view_registry(self, feature_view: BaseFeatureView) -> Dict[str, BaseFeatureView]: + # returns the sub-registry that aligns with `type(feature_view)`, or an exception if the type is unknown if isinstance(feature_view, StreamFeatureView): return self.stream_feature_views if isinstance(feature_view, FeatureView): @@ -81,13 +123,52 @@ def _fv_registry(self, feature_view: BaseFeatureView) -> Dict[str, BaseFeatureVi return self.on_demand_feature_views if isinstance(feature_view, RequestFeatureView): return self.request_feature_views - raise RegistryError(f"Unknown feature view type {feature_view}") + raise FeatureViewNotFoundException(feature_view) + + def _maybe_init_project_metadata(self, project: str) -> None: + if project not in self.project_metadata: + self.project_metadata[project] = ProjectMetadata(project_name=project) + usage.set_current_project_uuid(self.project_metadata[project].project_uuid) + + def _delete_object( + self, name: str, project: str, registry: Dict[str, FeastResource], on_miss_exc: Exception + ) -> None: + self._maybe_init_project_metadata(project) + + # deletes a key from `registry`, or `on_miss_exc` is raised if the object doesn't exist in the registry + key = project_key(project, name) + if key not in registry: + raise on_miss_exc + del registry[key] + + def _get_object( + self, name: str, project: str, registry: Dict[str, FeastResource], on_miss_exc: Exception + ) -> FeastResource: + self._maybe_init_project_metadata(project) + + # returns a `FeastResource` from the registry, or `on_miss_exc` if the object doesn't exist in the registry + if not self.is_built: + raise RegistryNotBuiltException(registry_name=self.__class__.__name__) + key = project_key(project, name) + if key not in registry: + raise on_miss_exc + return copy.copy(registry[key]) + + def _update_object_ts(self, obj: TimeDependentObject) -> TimeDependentObject: + # updates the `created_timestamp` and `last_updated_timestamp` attributes of a `TimeDependentObject` + now = datetime.utcnow() + if not obj.created_timestamp: + obj.created_timestamp = now + obj.last_updated_timestamp = now + return obj def enter_apply_context(self): - pass + self.is_feast_apply = True def exit_apply_context(self): - pass + self.is_feast_apply = False + # if this flag is not set, `get_*` operations of the registry will fail + self.is_built = True def apply_entity(self, entity: Entity, project: str, commit: bool = True): """ @@ -98,16 +179,13 @@ def apply_entity(self, entity: Entity, project: str, commit: bool = True): project: Feast project that this entity belongs to commit: Whether the change should be persisted immediately """ - entity.is_valid() - now = datetime.utcnow() - if not entity.created_timestamp: - entity.created_timestamp = now - entity.last_updated_timestamp = now + self._maybe_init_project_metadata(project) + entity.is_valid() key = project_key(project, entity.name) if key in self.entities: - raise RegistryError(f"Duplicate entity {entity.name} for project {project}.") - self.entities[key] = entity + raise EntityNameCollisionException(entity.name, project) + self.entities[key] = copy.copy(self._update_object_ts(obj=entity)) def delete_entity(self, name: str, project: str, commit: bool = True): """ @@ -118,10 +196,9 @@ def delete_entity(self, name: str, project: str, commit: bool = True): project: Feast project that this entity belongs to commit: Whether the change should be persisted immediately """ - key = project_key(project, name) - if key not in self.entities: - raise RegistryError(f"Cannot delete unknown entity {name} for project {project}.") - del self.entities[key] + self._delete_object( + name=name, project=project, registry=self.entities, on_miss_exc=EntityNotFoundException(name, project) + ) def get_entity(self, name: str, project: str, allow_cache: bool = False) -> Entity: """ @@ -136,10 +213,9 @@ def get_entity(self, name: str, project: str, allow_cache: bool = False) -> Enti Returns either the specified entity, or raises an exception if none is found """ - key = project_key(project, name) - if key not in self.entities: - raise RegistryError(f"Cannot retrieve unknown entity {name} for project {project}.") - return copy.copy(self.entities[key]) + return self._get_object( + name=name, project=project, registry=self.entities, on_miss_exc=EntityNotFoundException(name, project) + ) def list_entities(self, project: str, allow_cache: bool = False) -> List[Entity]: """ @@ -156,7 +232,7 @@ def list_entities(self, project: str, allow_cache: bool = False) -> List[Entity] # Data source operations def apply_data_source( - self, data_source: DataSource, project: str, commit: bool = True + self, data_source: DataSource, project: str, commit: bool = True ): """ Registers a single data source with Feast @@ -166,10 +242,12 @@ def apply_data_source( project: Feast project that this data source belongs to commit: Whether to immediately commit to the registry """ + self._maybe_init_project_metadata(project) + key = project_key(project, data_source.name) if key in self.data_sources: - raise RegistryError(f"Duplicate data source {data_source.name} for project {project}.") - self.data_sources[key] = data_source + raise DataSourceRepeatNamesException(data_source.name) + self.data_sources[key] = copy.copy(data_source) def delete_data_source(self, name: str, project: str, commit: bool = True): """ @@ -180,13 +258,11 @@ def delete_data_source(self, name: str, project: str, commit: bool = True): project: Feast project that this data source belongs to commit: Whether the change should be persisted immediately """ - key = project_key(project, name) - if key not in self.data_sources: - raise RegistryError(f"Cannot delete unknown data source {name} for project {project}.") - del self.data_sources[key] + exc = DataSourceObjectNotFoundException(name=name, project=project) + self._delete_object(name=name, project=project, registry=self.data_sources, on_miss_exc=exc) def get_data_source( - self, name: str, project: str, allow_cache: bool = False + self, name: str, project: str, allow_cache: bool = False ) -> DataSource: """ Retrieves a data source. @@ -199,13 +275,11 @@ def get_data_source( Returns: Returns either the specified data source, or raises an exception if none is found """ - key = project_key(project, name) - if key not in self.data_sources: - raise RegistryError(f"Cannot retrieve unknown data source {name} for project {project}.") - return copy.copy(self.data_sources[key]) + exc = DataSourceObjectNotFoundException(name=name, project=project) + return self._get_object(name=name, project=project, registry=self.data_sources, on_miss_exc=exc) def list_data_sources( - self, project: str, allow_cache: bool = False + self, project: str, allow_cache: bool = False ) -> List[DataSource]: """ Retrieve a list of data sources from the registry @@ -221,7 +295,7 @@ def list_data_sources( # Feature service operations def apply_feature_service( - self, feature_service: FeatureService, project: str, commit: bool = True + self, feature_service: FeatureService, project: str, commit: bool = True ): """ Registers a single feature service with Feast @@ -230,10 +304,12 @@ def apply_feature_service( feature_service: A feature service that will be registered project: Feast project that this entity belongs to """ + self._maybe_init_project_metadata(project) + key = project_key(project, feature_service.name) if key in self.feature_services: - raise RegistryError(f"Duplicate feature service {feature_service.name} for project {project}.") - self.feature_services[key] = feature_service + raise FeatureServiceNameCollisionException(service_name=feature_service.name, project=project) + self.feature_services[key] = copy.copy(self._update_object_ts(feature_service)) def delete_feature_service(self, name: str, project: str, commit: bool = True): """ @@ -244,10 +320,8 @@ def delete_feature_service(self, name: str, project: str, commit: bool = True): project: Feast project that this feature service belongs to commit: Whether the change should be persisted immediately """ - key = project_key(project, name) - if key not in self.feature_services: - raise RegistryError(f"Cannot delete unknown feature service {name} for project {project}.") - del self.feature_services[key] + exc = FeatureServiceNotFoundException(name=name, project=project) + self._delete_object(name=name, project=project, registry=self.feature_services, on_miss_exc=exc) def get_feature_service( self, name: str, project: str, allow_cache: bool = False @@ -264,13 +338,11 @@ def get_feature_service( Returns either the specified feature service, or raises an exception if none is found """ - key = project_key(project, name) - if key not in self.feature_services: - raise RegistryError(f"Cannot retrieve unknown feature service {name} for project {project}.") - return copy.copy(self.feature_services[key]) + exc = FeatureServiceNotFoundException(name=name, project=project) + return self._get_object(name=name, project=project, registry=self.feature_services, on_miss_exc=exc) def list_feature_services( - self, project: str, allow_cache: bool = False + self, project: str, allow_cache: bool = False ) -> List[FeatureService]: """ Retrieve a list of feature services from the registry @@ -285,7 +357,7 @@ def list_feature_services( return list_registry_dict(project, self.feature_services) def apply_feature_view( - self, feature_view: BaseFeatureView, project: str, commit: bool = True + self, feature_view: BaseFeatureView, project: str, commit: bool = True ): """ Registers a single feature view with Feast @@ -295,11 +367,14 @@ def apply_feature_view( project: Feast project that this feature view belongs to commit: Whether the change should be persisted immediately """ - registry = self._fv_registry(feature_view) + self._maybe_init_project_metadata(project) + feature_view.ensure_valid() + + registry = self._get_feature_view_registry(feature_view) key = project_key(project, feature_view.name) if key in registry: - raise RegistryError(f"Duplicate feature view {feature_view.name} for project {project}.") - registry[key] = feature_view + raise ConflictingFeatureViewNames(feature_view.name) + registry[key] = copy.copy(self._update_object_ts(feature_view)) def delete_feature_view(self, name: str, project: str, commit: bool = True): """ @@ -311,15 +386,13 @@ def delete_feature_view(self, name: str, project: str, commit: bool = True): commit: Whether the change should be persisted immediately """ key = project_key(project, name) - for registry in self.fv_registries: + for registry in self.feature_view_registries: if key in registry: del registry[key] return - raise RegistryError(f"Cannot delete unknown feature view {name} for project {project}.") + raise FeatureViewNotFoundException(name=name, project=project) - def get_stream_feature_view( - self, name: str, project: str, allow_cache: bool = False - ): + def get_stream_feature_view(self, name: str, project: str, allow_cache: bool = False): """ Retrieves a stream feature view. @@ -332,13 +405,11 @@ def get_stream_feature_view( Returns either the specified feature view, or raises an exception if none is found """ - key = project_key(project, name) - if key not in self.stream_feature_views: - raise RegistryError(f"Cannot retrieve unknown stream feature view {name} for project {project}") - return copy.copy(self.stream_feature_views[key]) + exc = FeatureViewNotFoundException(name=name, project=project) + return self._get_object(name=name, project=project, registry=self.stream_feature_views, on_miss_exc=exc) def list_stream_feature_views( - self, project: str, allow_cache: bool = False, ignore_udfs: bool = False, + self, project: str, allow_cache: bool = False, ignore_udfs: bool = False ) -> List[StreamFeatureView]: """ Retrieve a list of stream feature views from the registry @@ -354,7 +425,7 @@ def list_stream_feature_views( return list_registry_dict(project, self.stream_feature_views) def get_on_demand_feature_view( - self, name: str, project: str, allow_cache: bool = False + self, name: str, project: str, allow_cache: bool = False ) -> OnDemandFeatureView: """ Retrieves an on demand feature view. @@ -368,13 +439,11 @@ def get_on_demand_feature_view( Returns either the specified on demand feature view, or raises an exception if none is found """ - key = project_key(project, name) - if key not in self.on_demand_feature_views: - raise RegistryError(f"Cannot retrieve unknown on demand feature view {name} for project {project}") - return copy.copy(self.on_demand_feature_views[key]) + exc = FeatureViewNotFoundException(name=name, project=project) + return self._get_object(name=name, project=project, registry=self.on_demand_feature_views, on_miss_exc=exc) def list_on_demand_feature_views( - self, project: str, allow_cache: bool = False, ignore_udfs: bool = False + self, project: str, allow_cache: bool = False, ignore_udfs: bool = False ) -> List[OnDemandFeatureView]: """ Retrieve a list of on demand feature views from the registry @@ -390,7 +459,7 @@ def list_on_demand_feature_views( return list_registry_dict(project, self.on_demand_feature_views) def get_feature_view( - self, name: str, project: str, allow_cache: bool = False + self, name: str, project: str, allow_cache: bool = False ) -> FeatureView: """ Retrieves a feature view. @@ -404,13 +473,11 @@ def get_feature_view( Returns either the specified feature view, or raises an exception if none is found """ - key = project_key(project, name) - if key not in self.feature_views: - raise RegistryError(f"Cannot retrieve unknown feature view {name} for project {project}") - return copy.copy(self.feature_views[key]) + exc = FeatureViewNotFoundException(name=name, project=project) + return self._get_object(name=name, project=project, registry=self.feature_views, on_miss_exc=exc) def list_feature_views( - self, project: str, allow_cache: bool = False + self, project: str, allow_cache: bool = False ) -> List[FeatureView]: """ Retrieve a list of feature views from the registry @@ -437,13 +504,11 @@ def get_request_feature_view(self, name: str, project: str) -> RequestFeatureVie Returns either the specified feature view, or raises an exception if none is found """ - key = project_key(project, name) - if key not in self.request_feature_views: - raise RegistryError(f"Cannot retrieve unknown request feature view {name} for project {project}") - return copy.copy(self.request_feature_views[key]) + exc = FeatureViewNotFoundException(name=name, project=project) + return self._get_object(name=name, project=project, registry=self.request_feature_views, on_miss_exc=exc) def list_request_feature_views( - self, project: str, allow_cache: bool = False + self, project: str, allow_cache: bool = False ) -> List[RequestFeatureView]: """ Retrieve a list of request feature views from the registry @@ -458,13 +523,15 @@ def list_request_feature_views( return list_registry_dict(project, self.request_feature_views) def apply_materialization( - self, - feature_view: FeatureView, - project: str, - start_date: datetime, - end_date: datetime, - commit: bool = True, + self, + feature_view: FeatureView, + project: str, + start_date: datetime, + end_date: datetime, + commit: bool = True, ): + self._maybe_init_project_metadata(project) + key = project_key(project, feature_view.name) for registry in [self.feature_views, self.stream_feature_views]: if key in registry: @@ -475,10 +542,10 @@ def apply_materialization( raise FeatureViewNotFoundException(feature_view.name, project) def apply_saved_dataset( - self, - saved_dataset: SavedDataset, - project: str, - commit: bool = True, + self, + saved_dataset: SavedDataset, + project: str, + commit: bool = True, ): """ Stores a saved dataset metadata with Feast @@ -488,13 +555,15 @@ def apply_saved_dataset( project: Feast project that this dataset belongs to commit: Whether the change should be persisted immediately """ + self._maybe_init_project_metadata(project) + key = project_key(project, saved_dataset.name) if key in self.saved_datasets: - raise RegistryError(f"Duplicate saved dataset {saved_dataset.name} for project {project}.") - self.saved_datasets[key] = saved_dataset + raise ValueError(f"Duplicate saved dataset {saved_dataset.name} for project {project}.") + self.saved_datasets[key] = copy.copy(self._update_object_ts(saved_dataset)) def get_saved_dataset( - self, name: str, project: str, allow_cache: bool = False + self, name: str, project: str, allow_cache: bool = False ) -> SavedDataset: """ Retrieves a saved dataset. @@ -508,7 +577,8 @@ def get_saved_dataset( Returns either the specified SavedDataset, or raises an exception if none is found """ - pass + exc = SavedDatasetNotFound(name=name, project=project) + return self._get_object(name=name, project=project, registry=self.saved_datasets, on_miss_exc=exc) def delete_saved_dataset(self, name: str, project: str, allow_cache: bool = False): """ @@ -523,9 +593,11 @@ def delete_saved_dataset(self, name: str, project: str, allow_cache: bool = Fals Returns either the specified SavedDataset, or raises an exception if none is found """ + exc = SavedDatasetNotFound(name=name, project=project) + self._delete_object(name=name, project=project, registry=self.saved_datasets, on_miss_exc=exc) def list_saved_datasets( - self, project: str, allow_cache: bool = False + self, project: str, allow_cache: bool = False ) -> List[SavedDataset]: """ Retrieves a list of all saved datasets in specified project @@ -537,13 +609,13 @@ def list_saved_datasets( Returns: Returns the list of SavedDatasets """ - pass + return list_registry_dict(project=project, registry=self.saved_datasets) def apply_validation_reference( - self, - validation_reference: ValidationReference, - project: str, - commit: bool = True, + self, + validation_reference: ValidationReference, + project: str, + commit: bool = True, ): """ Persist a validation reference @@ -553,7 +625,13 @@ def apply_validation_reference( project: Feast project that this dataset belongs to commit: Whether the change should be persisted immediately """ - pass + self._maybe_init_project_metadata(project) + + key = project_key(project, validation_reference.name) + if key in self.validation_references: + raise DuplicateValidationReference(name=validation_reference.name, project=project) + self.validation_references[key] = copy.copy(validation_reference) + def delete_validation_reference(self, name: str, project: str, commit: bool = True): """ @@ -564,10 +642,11 @@ def delete_validation_reference(self, name: str, project: str, commit: bool = Tr project: Feast project that this object belongs to commit: Whether the change should be persisted immediately """ - pass + exc = ValidationReferenceNotFound(name=name, project=project) + self._delete_object(name=name, project=project, registry=self.validation_references, on_miss_exc=exc) def get_validation_reference( - self, name: str, project: str, allow_cache: bool = False + self, name: str, project: str, allow_cache: bool = False ) -> ValidationReference: """ Retrieves a validation reference. @@ -581,10 +660,11 @@ def get_validation_reference( Returns either the specified ValidationReference, or raises an exception if none is found """ - pass + exc = ValidationReferenceNotFound(name=name, project=project) + return self._get_object(name=name, project=project, registry=self.validation_references, on_miss_exc=exc) def list_validation_references( - self, project: str, allow_cache: bool = False + self, project: str, allow_cache: bool = False ) -> List[ValidationReference]: """ Retrieve a list of validation references from the registry @@ -596,10 +676,10 @@ def list_validation_references( Returns: List of request feature views """ - pass + return list_registry_dict(project=project, registry=self.validation_references) def list_project_metadata( - self, project: str, allow_cache: bool = False + self, project: str, allow_cache: bool = False ) -> List[ProjectMetadata]: """ Retrieves project metadata @@ -611,7 +691,7 @@ def list_project_metadata( Returns: List of project metadata """ - pass + return list_registry_dict(project=project, registry=self.project_metadata) def update_infra(self, infra: Infra, project: str, commit: bool = True): """ @@ -622,7 +702,7 @@ def update_infra(self, infra: Infra, project: str, commit: bool = True): project: Feast project that the Infra object refers to commit: Whether the change should be persisted immediately """ - pass + self.infra[project] = copy.copy(infra) def get_infra(self, project: str, allow_cache: bool = False) -> Infra: """ @@ -635,27 +715,61 @@ def get_infra(self, project: str, allow_cache: bool = False) -> Infra: Returns: The stored Infra object. """ - pass + if project not in self.infra: + raise MissingInfraObjectException(project) + return self.infra[project] - def apply_user_metadata( - self, - project: str, - feature_view: BaseFeatureView, - metadata_bytes: Optional[bytes], - ): + def apply_user_metadata(self, project: str, feature_view: BaseFeatureView, metadata_bytes: Optional[bytes]) -> None: + # not supported for in-memory objects pass - def get_user_metadata( - self, project: str, feature_view: BaseFeatureView - ) -> Optional[bytes]: + def get_user_metadata(self, project: str, feature_view: BaseFeatureView) -> Optional[bytes]: + # not supported for in-memory objects pass def proto(self) -> RegistryProto: + r = RegistryProto() + last_updated_timestamps = [] + for project in : + for lister, registry_proto_field in [ + (self.list_entities, r.entities), + (self.list_feature_views, r.feature_views), + (self.list_data_sources, r.data_sources), + (self.list_on_demand_feature_views, r.on_demand_feature_views), + (self.list_request_feature_views, r.request_feature_views), + (self.list_stream_feature_views, r.stream_feature_views), + (self.list_feature_services, r.feature_services), + (self.list_saved_datasets, r.saved_datasets), + (self.list_validation_references, r.validation_references), + (self.list_project_metadata, r.project_metadata), + ]: + lister_has_udf = lister in {self.list_on_demand_feature_views, self.list_stream_feature_views} + ignore_udfs = self._in_feast_apply_context + objs: List[Any] = lister(project, ignore_udfs=ignore_udfs) if lister_has_udf else lister( + project) # type: ignore + if objs: + registry_proto_field_data = [] + for obj in objs: + object_proto = obj.to_proto() + # Overriding project when missing, this is to handle failures when the registry is cached + if getattr(object_proto, 'spec', None) and object_proto.spec.project == '': + object_proto.spec.project = project + registry_proto_field_data.append(object_proto) + + registry_proto_field.extend(registry_proto_field_data) + r.infra.CopyFrom(self.get_infra(project).to_proto()) + last_updated_metadata = self._get_last_updated_metadata(project) + if last_updated_metadata is not None: + last_updated_timestamps.append(self._get_last_updated_metadata(project)) + + if last_updated_timestamps: + r.last_updated.FromDatetime(max(last_updated_timestamps)) + return r + + def commit(self) -> None: + # This is a noop because transactions are not supported pass - def commit(self): - pass - - def refresh(self, project: Optional[str] = None): - """Refreshes the state of the registry cache by fetching the registry state from the remote registry store.""" + def refresh(self, project: Optional[str] = None) -> None: + # This is a noop because MemoryRegistry is a cache pass diff --git a/sdk/python/feast/infra/registry/sql.py b/sdk/python/feast/infra/registry/sql.py index ede3a3b5e0..57016e1d5e 100644 --- a/sdk/python/feast/infra/registry/sql.py +++ b/sdk/python/feast/infra/registry/sql.py @@ -13,7 +13,7 @@ String, Table, create_engine, - delete,'' + delete, insert, select, update, diff --git a/sdk/python/feast/repo_operations.py b/sdk/python/feast/repo_operations.py index ac639d5461..9e9b3edad6 100644 --- a/sdk/python/feast/repo_operations.py +++ b/sdk/python/feast/repo_operations.py @@ -7,7 +7,7 @@ from importlib.abc import Loader from importlib.machinery import ModuleSpec from pathlib import Path -from typing import List, Set, Union +from typing import List, Set, Union, Tuple, Optional import click from click.exceptions import BadParameter @@ -16,6 +16,7 @@ from feast.batch_feature_view import BatchFeatureView from feast.data_source import DataSource, KafkaSource, KinesisSource from feast.diff.registry_diff import extract_objects_for_keep_delete_update_add +from feast.infra.registry.base_registry import BaseRegistry from feast.entity import Entity from feast.feature_service import FeatureService from feast.feature_store import FeatureStore @@ -219,8 +220,11 @@ def plan(repo_config: RepoConfig, repo_path: Path, skip_source_validation: bool) click.echo(infra_diff.to_string()) -def _prepare_registry_and_repo(repo_config, repo_path, is_feast_apply=False): - store = FeatureStore(config=repo_config, is_feast_apply=is_feast_apply) +def _prepare_registry_and_repo( + repo_config: RepoConfig, repo_path: Path, is_feast_apply: bool = False, store: Optional[FeatureStore] = None +) -> Tuple[str, BaseRegistry, RepoContents, FeatureStore]: + if store is None: + store = FeatureStore(config=repo_config, is_feast_apply=is_feast_apply) project = store.project if not is_valid_name(project): print( @@ -331,9 +335,13 @@ def log_infra_changes( @log_exceptions_and_usage -def apply_total(repo_config: RepoConfig, repo_path: Path, skip_source_validation: bool): +def apply_total( + repo_config: RepoConfig, repo_path: Path, skip_source_validation: bool, store: Optional[FeatureStore] = None +) -> None: os.chdir(repo_path) - project, registry, repo, store = _prepare_registry_and_repo(repo_config, repo_path, is_feast_apply=True) + project, registry, repo, store = _prepare_registry_and_repo( + repo_config, repo_path, is_feast_apply=True, store=store + ) apply_total_with_repo_instance( store, project, registry, repo, skip_source_validation ) From 2949d598464bd74ab7bd95c936af40080f03eabe Mon Sep 17 00:00:00 2001 From: Ross Briden Date: Tue, 10 Oct 2023 14:04:29 -0700 Subject: [PATCH 3/9] implement proto --- sdk/python/feast/infra/registry/memory.py | 23 +++++++---------------- 1 file changed, 7 insertions(+), 16 deletions(-) diff --git a/sdk/python/feast/infra/registry/memory.py b/sdk/python/feast/infra/registry/memory.py index 8d7737de96..a5ff3c5865 100644 --- a/sdk/python/feast/infra/registry/memory.py +++ b/sdk/python/feast/infra/registry/memory.py @@ -126,6 +126,7 @@ def _get_feature_view_registry(self, feature_view: BaseFeatureView) -> Dict[str, raise FeatureViewNotFoundException(feature_view) def _maybe_init_project_metadata(self, project: str) -> None: + # updates `usage` project uuid to match requested project if project not in self.project_metadata: self.project_metadata[project] = ProjectMetadata(project_name=project) usage.set_current_project_uuid(self.project_metadata[project].project_uuid) @@ -133,9 +134,9 @@ def _maybe_init_project_metadata(self, project: str) -> None: def _delete_object( self, name: str, project: str, registry: Dict[str, FeastResource], on_miss_exc: Exception ) -> None: + # deletes a key from `registry`, or `on_miss_exc` is raised if the object doesn't exist in the registry self._maybe_init_project_metadata(project) - # deletes a key from `registry`, or `on_miss_exc` is raised if the object doesn't exist in the registry key = project_key(project, name) if key not in registry: raise on_miss_exc @@ -144,9 +145,9 @@ def _delete_object( def _get_object( self, name: str, project: str, registry: Dict[str, FeastResource], on_miss_exc: Exception ) -> FeastResource: + # returns a `FeastResource` from the registry, or `on_miss_exc` if the object doesn't exist in the registry self._maybe_init_project_metadata(project) - # returns a `FeastResource` from the registry, or `on_miss_exc` if the object doesn't exist in the registry if not self.is_built: raise RegistryNotBuiltException(registry_name=self.__class__.__name__) key = project_key(project, name) @@ -720,17 +721,16 @@ def get_infra(self, project: str, allow_cache: bool = False) -> Infra: return self.infra[project] def apply_user_metadata(self, project: str, feature_view: BaseFeatureView, metadata_bytes: Optional[bytes]) -> None: - # not supported for in-memory objects + # not supported for BaseFeatureView in-memory objects pass def get_user_metadata(self, project: str, feature_view: BaseFeatureView) -> Optional[bytes]: - # not supported for in-memory objects + # not supported for BaseFeatureView in-memory objects pass def proto(self) -> RegistryProto: r = RegistryProto() - last_updated_timestamps = [] - for project in : + for project in self.project_metadata: for lister, registry_proto_field in [ (self.list_entities, r.entities), (self.list_feature_views, r.feature_views), @@ -743,10 +743,7 @@ def proto(self) -> RegistryProto: (self.list_validation_references, r.validation_references), (self.list_project_metadata, r.project_metadata), ]: - lister_has_udf = lister in {self.list_on_demand_feature_views, self.list_stream_feature_views} - ignore_udfs = self._in_feast_apply_context - objs: List[Any] = lister(project, ignore_udfs=ignore_udfs) if lister_has_udf else lister( - project) # type: ignore + objs: List[Any] = lister(project) if objs: registry_proto_field_data = [] for obj in objs: @@ -758,12 +755,6 @@ def proto(self) -> RegistryProto: registry_proto_field.extend(registry_proto_field_data) r.infra.CopyFrom(self.get_infra(project).to_proto()) - last_updated_metadata = self._get_last_updated_metadata(project) - if last_updated_metadata is not None: - last_updated_timestamps.append(self._get_last_updated_metadata(project)) - - if last_updated_timestamps: - r.last_updated.FromDatetime(max(last_updated_timestamps)) return r def commit(self) -> None: From 7eb10902299b2a11b51d8ec13459a8152ff34059 Mon Sep 17 00:00:00 2001 From: Ross Briden Date: Wed, 11 Oct 2023 12:35:14 -0700 Subject: [PATCH 4/9] validate logic --- sdk/python/feast/errors.py | 5 + sdk/python/feast/feature_store.py | 2 +- sdk/python/feast/infra/registry/memory.py | 164 ++++++++++------------ 3 files changed, 83 insertions(+), 88 deletions(-) diff --git a/sdk/python/feast/errors.py b/sdk/python/feast/errors.py index 66e1e4dd1b..69b3666da9 100644 --- a/sdk/python/feast/errors.py +++ b/sdk/python/feast/errors.py @@ -441,3 +441,8 @@ def __init__(self, service_name: str, project: str) -> None: class MissingInfraObjectException(Exception): def __init__(self, project: str) -> None: super(MissingInfraObjectException, self).__init__(f"No infra objects found for project {project}.") + + +class SavedDatasetCollisionException(Exception): + def __init__(self, project: str, name: str) -> None: + super(SavedDatasetCollisionException, self).__init__(f"Duplicated saved dataset {name} for project {project}") \ No newline at end of file diff --git a/sdk/python/feast/feature_store.py b/sdk/python/feast/feature_store.py index 827bc7e7aa..c9d64f4b30 100644 --- a/sdk/python/feast/feature_store.py +++ b/sdk/python/feast/feature_store.py @@ -175,11 +175,11 @@ def __init__( if registry_config.registry_type == "sql": self._registry = SqlRegistry(registry_config, None, is_feast_apply=is_feast_apply) elif registry_config.registry_type == "memory": - from feast.repo_operations import apply_total self._registry = MemoryRegistry(registry_config, repo_path, is_feast_apply=is_feast_apply) # RB: MemoryRegistry is stateless, meaning we'll need to call `apply` with each new FeatureStore instance if not is_feast_apply: + from feast.repo_operations import apply_total apply_total(repo_config=self.config, repo_path=self.repo_path, skip_source_validation=False, store=self) else: r = Registry(registry_config, repo_path=self.repo_path) diff --git a/sdk/python/feast/infra/registry/memory.py b/sdk/python/feast/infra/registry/memory.py index a5ff3c5865..2211d8e311 100644 --- a/sdk/python/feast/infra/registry/memory.py +++ b/sdk/python/feast/infra/registry/memory.py @@ -36,7 +36,8 @@ DataSourceObjectNotFoundException, SavedDatasetNotFound, DuplicateValidationReference, - MissingInfraObjectException + MissingInfraObjectException, + SavedDatasetCollisionException ) TimeDependentObject = Union[ @@ -68,7 +69,7 @@ def invert_projected_key(projected_key: str) -> Tuple[str, str]: return s[0], s[1] -def list_registry_dict(project: str, registry: Dict[str, Any]) -> List[Any]: +def list_registry_dict(project: str, registry: Dict[str, FeastResource]) -> List[FeastResource]: # returns a shallow copy of all values in registry that belong to `project` return [ copy.copy(v) for k, v in registry.items() if invert_projected_key(k)[0] == project @@ -92,7 +93,6 @@ def __init__( self.is_feast_apply = is_feast_apply self.infra: Dict[str, Infra] = {} - self.entities: Dict[str, Entity] = {} self.feature_services: Dict[str, FeatureService] = {} self.project_metadata: Dict[str, ProjectMetadata] = {} @@ -113,6 +113,18 @@ def __init__( self.request_feature_views ] + # recomputing `RegistryProto` is expensive, cache unless changed + self.cached_proto: Optional[RegistryProto] = None + + def enter_apply_context(self): + self.is_feast_apply = True + + def exit_apply_context(self): + self.is_feast_apply = False + # if this flag is not set, `get_*` operations of the registry will fail; this flag is subtly different from + # `is_feast_apply` in that `is_built` remains True if set at least once. + self.is_built = True + def _get_feature_view_registry(self, feature_view: BaseFeatureView) -> Dict[str, BaseFeatureView]: # returns the sub-registry that aligns with `type(feature_view)`, or an exception if the type is unknown if isinstance(feature_view, StreamFeatureView): @@ -127,51 +139,48 @@ def _get_feature_view_registry(self, feature_view: BaseFeatureView) -> Dict[str, def _maybe_init_project_metadata(self, project: str) -> None: # updates `usage` project uuid to match requested project - if project not in self.project_metadata: - self.project_metadata[project] = ProjectMetadata(project_name=project) - usage.set_current_project_uuid(self.project_metadata[project].project_uuid) + metadata = self.project_metadata.setdefault(project, ProjectMetadata(project_name=project)) + usage.set_current_project_uuid(metadata.project_uuid) + + def _maybe_reset_proto_registry(self) -> None: + # set cached proto registry to `None` if write operation is applied and registry is built + if self.is_built: + self.cached_proto = None def _delete_object( self, name: str, project: str, registry: Dict[str, FeastResource], on_miss_exc: Exception ) -> None: # deletes a key from `registry`, or `on_miss_exc` is raised if the object doesn't exist in the registry self._maybe_init_project_metadata(project) - key = project_key(project, name) if key not in registry: raise on_miss_exc del registry[key] + self._maybe_reset_proto_registry() def _get_object( self, name: str, project: str, registry: Dict[str, FeastResource], on_miss_exc: Exception ) -> FeastResource: # returns a `FeastResource` from the registry, or `on_miss_exc` if the object doesn't exist in the registry self._maybe_init_project_metadata(project) - if not self.is_built: raise RegistryNotBuiltException(registry_name=self.__class__.__name__) key = project_key(project, name) if key not in registry: raise on_miss_exc + # always return copies to reduce the likelihood of side effects, BUT notice that this is NOT a deep copy return copy.copy(registry[key]) def _update_object_ts(self, obj: TimeDependentObject) -> TimeDependentObject: # updates the `created_timestamp` and `last_updated_timestamp` attributes of a `TimeDependentObject` + # WARNING: this is an in-place operation! now = datetime.utcnow() if not obj.created_timestamp: obj.created_timestamp = now obj.last_updated_timestamp = now return obj - def enter_apply_context(self): - self.is_feast_apply = True - - def exit_apply_context(self): - self.is_feast_apply = False - # if this flag is not set, `get_*` operations of the registry will fail - self.is_built = True - - def apply_entity(self, entity: Entity, project: str, commit: bool = True): + def apply_entity(self, entity: Entity, project: str, commit: bool = True) -> None: """ Registers a single entity with Feast @@ -186,9 +195,12 @@ def apply_entity(self, entity: Entity, project: str, commit: bool = True): key = project_key(project, entity.name) if key in self.entities: raise EntityNameCollisionException(entity.name, project) - self.entities[key] = copy.copy(self._update_object_ts(obj=entity)) - def delete_entity(self, name: str, project: str, commit: bool = True): + entity_copy = copy.copy(entity) + self.entities[key] = self._update_object_ts(entity_copy) + self._maybe_reset_proto_registry() + + def delete_entity(self, name: str, project: str, commit: bool = True) -> None: """ Deletes an entity or raises an exception if not found. @@ -214,9 +226,8 @@ def get_entity(self, name: str, project: str, allow_cache: bool = False) -> Enti Returns either the specified entity, or raises an exception if none is found """ - return self._get_object( - name=name, project=project, registry=self.entities, on_miss_exc=EntityNotFoundException(name, project) - ) + exc = EntityNotFoundException(name, project) + return self._get_object(name=name, project=project, registry=self.entities, on_miss_exc=exc) def list_entities(self, project: str, allow_cache: bool = False) -> List[Entity]: """ @@ -229,12 +240,9 @@ def list_entities(self, project: str, allow_cache: bool = False) -> List[Entity] Returns: List of entities """ - return list_registry_dict(project, self.entities) + return list_registry_dict(project=project, registry=self.entities) - # Data source operations - def apply_data_source( - self, data_source: DataSource, project: str, commit: bool = True - ): + def apply_data_source(self, data_source: DataSource, project: str, commit: bool = True) -> None: """ Registers a single data source with Feast @@ -244,13 +252,13 @@ def apply_data_source( commit: Whether to immediately commit to the registry """ self._maybe_init_project_metadata(project) - key = project_key(project, data_source.name) if key in self.data_sources: raise DataSourceRepeatNamesException(data_source.name) self.data_sources[key] = copy.copy(data_source) + self._maybe_reset_proto_registry() - def delete_data_source(self, name: str, project: str, commit: bool = True): + def delete_data_source(self, name: str, project: str, commit: bool = True) -> None: """ Deletes a data source or raises an exception if not found. @@ -262,9 +270,7 @@ def delete_data_source(self, name: str, project: str, commit: bool = True): exc = DataSourceObjectNotFoundException(name=name, project=project) self._delete_object(name=name, project=project, registry=self.data_sources, on_miss_exc=exc) - def get_data_source( - self, name: str, project: str, allow_cache: bool = False - ) -> DataSource: + def get_data_source(self, name: str, project: str, allow_cache: bool = False) -> DataSource: """ Retrieves a data source. @@ -279,9 +285,7 @@ def get_data_source( exc = DataSourceObjectNotFoundException(name=name, project=project) return self._get_object(name=name, project=project, registry=self.data_sources, on_miss_exc=exc) - def list_data_sources( - self, project: str, allow_cache: bool = False - ) -> List[DataSource]: + def list_data_sources(self, project: str, allow_cache: bool = False) -> List[DataSource]: """ Retrieve a list of data sources from the registry @@ -292,12 +296,9 @@ def list_data_sources( Returns: List of data sources """ - return list_registry_dict(project, self.data_sources) + return list_registry_dict(project=project, registry=self.data_sources) - # Feature service operations - def apply_feature_service( - self, feature_service: FeatureService, project: str, commit: bool = True - ): + def apply_feature_service(self, feature_service: FeatureService, project: str, commit: bool = True) -> None: """ Registers a single feature service with Feast @@ -306,13 +307,14 @@ def apply_feature_service( project: Feast project that this entity belongs to """ self._maybe_init_project_metadata(project) - key = project_key(project, feature_service.name) if key in self.feature_services: raise FeatureServiceNameCollisionException(service_name=feature_service.name, project=project) - self.feature_services[key] = copy.copy(self._update_object_ts(feature_service)) + service_copy = copy.copy(feature_service) + self.feature_services[key] = self._update_object_ts(service_copy) + self._maybe_reset_proto_registry() - def delete_feature_service(self, name: str, project: str, commit: bool = True): + def delete_feature_service(self, name: str, project: str, commit: bool = True) -> None: """ Deletes a feature service or raises an exception if not found. @@ -324,9 +326,7 @@ def delete_feature_service(self, name: str, project: str, commit: bool = True): exc = FeatureServiceNotFoundException(name=name, project=project) self._delete_object(name=name, project=project, registry=self.feature_services, on_miss_exc=exc) - def get_feature_service( - self, name: str, project: str, allow_cache: bool = False - ) -> FeatureService: + def get_feature_service(self, name: str, project: str, allow_cache: bool = False) -> FeatureService: """ Retrieves a feature service. @@ -342,9 +342,7 @@ def get_feature_service( exc = FeatureServiceNotFoundException(name=name, project=project) return self._get_object(name=name, project=project, registry=self.feature_services, on_miss_exc=exc) - def list_feature_services( - self, project: str, allow_cache: bool = False - ) -> List[FeatureService]: + def list_feature_services(self, project: str, allow_cache: bool = False) -> List[FeatureService]: """ Retrieve a list of feature services from the registry @@ -357,9 +355,7 @@ def list_feature_services( """ return list_registry_dict(project, self.feature_services) - def apply_feature_view( - self, feature_view: BaseFeatureView, project: str, commit: bool = True - ): + def apply_feature_view(self, feature_view: BaseFeatureView, project: str, commit: bool = True) -> None: """ Registers a single feature view with Feast @@ -375,9 +371,11 @@ def apply_feature_view( key = project_key(project, feature_view.name) if key in registry: raise ConflictingFeatureViewNames(feature_view.name) - registry[key] = copy.copy(self._update_object_ts(feature_view)) + feature_view_copy = copy.copy(feature_view) + registry[key] = self._update_object_ts(feature_view_copy) + self._maybe_reset_proto_registry() - def delete_feature_view(self, name: str, project: str, commit: bool = True): + def delete_feature_view(self, name: str, project: str, commit: bool = True) -> None: """ Deletes a feature view or raises an exception if not found. @@ -386,14 +384,16 @@ def delete_feature_view(self, name: str, project: str, commit: bool = True): project: Feast project that this feature view belongs to commit: Whether the change should be persisted immediately """ + self._maybe_init_project_metadata(project=project) key = project_key(project, name) for registry in self.feature_view_registries: if key in registry: del registry[key] + self._maybe_reset_proto_registry() return raise FeatureViewNotFoundException(name=name, project=project) - def get_stream_feature_view(self, name: str, project: str, allow_cache: bool = False): + def get_stream_feature_view(self, name: str, project: str, allow_cache: bool = False) -> StreamFeatureView: """ Retrieves a stream feature view. @@ -425,9 +425,7 @@ def list_stream_feature_views( """ return list_registry_dict(project, self.stream_feature_views) - def get_on_demand_feature_view( - self, name: str, project: str, allow_cache: bool = False - ) -> OnDemandFeatureView: + def get_on_demand_feature_view(self, name: str, project: str, allow_cache: bool = False) -> OnDemandFeatureView: """ Retrieves an on demand feature view. @@ -459,9 +457,7 @@ def list_on_demand_feature_views( """ return list_registry_dict(project, self.on_demand_feature_views) - def get_feature_view( - self, name: str, project: str, allow_cache: bool = False - ) -> FeatureView: + def get_feature_view(self, name: str, project: str, allow_cache: bool = False) -> FeatureView: """ Retrieves a feature view. @@ -477,9 +473,7 @@ def get_feature_view( exc = FeatureViewNotFoundException(name=name, project=project) return self._get_object(name=name, project=project, registry=self.feature_views, on_miss_exc=exc) - def list_feature_views( - self, project: str, allow_cache: bool = False - ) -> List[FeatureView]: + def list_feature_views(self, project: str, allow_cache: bool = False) -> List[FeatureView]: """ Retrieve a list of feature views from the registry @@ -508,9 +502,7 @@ def get_request_feature_view(self, name: str, project: str) -> RequestFeatureVie exc = FeatureViewNotFoundException(name=name, project=project) return self._get_object(name=name, project=project, registry=self.request_feature_views, on_miss_exc=exc) - def list_request_feature_views( - self, project: str, allow_cache: bool = False - ) -> List[RequestFeatureView]: + def list_request_feature_views(self, project: str, allow_cache: bool = False) -> List[RequestFeatureView]: """ Retrieve a list of request feature views from the registry @@ -530,15 +522,15 @@ def apply_materialization( start_date: datetime, end_date: datetime, commit: bool = True, - ): + ) -> None: self._maybe_init_project_metadata(project) - key = project_key(project, feature_view.name) for registry in [self.feature_views, self.stream_feature_views]: if key in registry: fv = registry[key] fv.materialization_intervals.append((start_date, end_date)) fv.last_updated_timestamp = datetime.utcnow() + self._maybe_reset_proto_registry() return raise FeatureViewNotFoundException(feature_view.name, project) @@ -547,7 +539,7 @@ def apply_saved_dataset( saved_dataset: SavedDataset, project: str, commit: bool = True, - ): + ) -> None: """ Stores a saved dataset metadata with Feast @@ -557,11 +549,12 @@ def apply_saved_dataset( commit: Whether the change should be persisted immediately """ self._maybe_init_project_metadata(project) - key = project_key(project, saved_dataset.name) if key in self.saved_datasets: - raise ValueError(f"Duplicate saved dataset {saved_dataset.name} for project {project}.") - self.saved_datasets[key] = copy.copy(self._update_object_ts(saved_dataset)) + raise SavedDatasetCollisionException(project=project, name=saved_dataset.name) + saved_dataset_copy = copy.copy(saved_dataset) + self.saved_datasets[key] = self._update_object_ts(saved_dataset_copy) + self._maybe_reset_proto_registry() def get_saved_dataset( self, name: str, project: str, allow_cache: bool = False @@ -597,9 +590,7 @@ def delete_saved_dataset(self, name: str, project: str, allow_cache: bool = Fals exc = SavedDatasetNotFound(name=name, project=project) self._delete_object(name=name, project=project, registry=self.saved_datasets, on_miss_exc=exc) - def list_saved_datasets( - self, project: str, allow_cache: bool = False - ) -> List[SavedDataset]: + def list_saved_datasets(self, project: str, allow_cache: bool = False) -> List[SavedDataset]: """ Retrieves a list of all saved datasets in specified project @@ -617,7 +608,7 @@ def apply_validation_reference( validation_reference: ValidationReference, project: str, commit: bool = True, - ): + ) -> None: """ Persist a validation reference @@ -627,14 +618,13 @@ def apply_validation_reference( commit: Whether the change should be persisted immediately """ self._maybe_init_project_metadata(project) - key = project_key(project, validation_reference.name) if key in self.validation_references: raise DuplicateValidationReference(name=validation_reference.name, project=project) self.validation_references[key] = copy.copy(validation_reference) + self._maybe_reset_proto_registry() - - def delete_validation_reference(self, name: str, project: str, commit: bool = True): + def delete_validation_reference(self, name: str, project: str, commit: bool = True) -> None: """ Deletes a validation reference or raises an exception if not found. @@ -664,9 +654,7 @@ def get_validation_reference( exc = ValidationReferenceNotFound(name=name, project=project) return self._get_object(name=name, project=project, registry=self.validation_references, on_miss_exc=exc) - def list_validation_references( - self, project: str, allow_cache: bool = False - ) -> List[ValidationReference]: + def list_validation_references(self, project: str, allow_cache: bool = False) -> List[ValidationReference]: """ Retrieve a list of validation references from the registry @@ -679,9 +667,7 @@ def list_validation_references( """ return list_registry_dict(project=project, registry=self.validation_references) - def list_project_metadata( - self, project: str, allow_cache: bool = False - ) -> List[ProjectMetadata]: + def list_project_metadata(self, project: str, allow_cache: bool = False) -> List[ProjectMetadata]: """ Retrieves project metadata @@ -704,6 +690,7 @@ def update_infra(self, infra: Infra, project: str, commit: bool = True): commit: Whether the change should be persisted immediately """ self.infra[project] = copy.copy(infra) + self._maybe_reset_proto_registry() def get_infra(self, project: str, allow_cache: bool = False) -> Infra: """ @@ -729,7 +716,10 @@ def get_user_metadata(self, project: str, feature_view: BaseFeatureView) -> Opti pass def proto(self) -> RegistryProto: - r = RegistryProto() + if self.cached_proto: + return self.cached_proto + + r = self.cached_proto = RegistryProto() for project in self.project_metadata: for lister, registry_proto_field in [ (self.list_entities, r.entities), From 3613781619d5c26e23059fbaadfe0d3b037013ba Mon Sep 17 00:00:00 2001 From: Ross Briden Date: Wed, 11 Oct 2023 17:06:37 -0700 Subject: [PATCH 5/9] fix some bugs in SFV copy constructor --- pyproject.toml | 2 +- sdk/python/feast/errors.py | 7 ++++++- sdk/python/feast/infra/registry/memory.py | 20 ++++++++++++-------- sdk/python/feast/repo_operations.py | 3 +-- sdk/python/feast/stream_feature_view.py | 4 ++-- sdk/python/feast/ui/package.json | 2 +- sdk/python/feast/ui/yarn.lock | 8 ++++---- setup.py | 2 +- 8 files changed, 28 insertions(+), 20 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index bfcdfa2548..290d7314d9 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "feast-affirm" -version = "0.28+affirm147" +version = "0.28+affirm155" description = "Feast - Affirm" authors = ["Francisco Arceo", "Ross Briden", "Maks Stachowiak"] readme = "README.md" diff --git a/sdk/python/feast/errors.py b/sdk/python/feast/errors.py index 69b3666da9..da572164e6 100644 --- a/sdk/python/feast/errors.py +++ b/sdk/python/feast/errors.py @@ -445,4 +445,9 @@ def __init__(self, project: str) -> None: class SavedDatasetCollisionException(Exception): def __init__(self, project: str, name: str) -> None: - super(SavedDatasetCollisionException, self).__init__(f"Duplicated saved dataset {name} for project {project}") \ No newline at end of file + super(SavedDatasetCollisionException, self).__init__(f"Duplicated saved dataset {name} for project {project}") + + +class MissingProjectMetadataException(Exception): + def __init__(self, project: str) -> None: + super(MissingProjectMetadataException, self).__init__(f"No project metadata for project {project}") diff --git a/sdk/python/feast/infra/registry/memory.py b/sdk/python/feast/infra/registry/memory.py index 2211d8e311..6d7df92e97 100644 --- a/sdk/python/feast/infra/registry/memory.py +++ b/sdk/python/feast/infra/registry/memory.py @@ -37,9 +37,11 @@ SavedDatasetNotFound, DuplicateValidationReference, MissingInfraObjectException, - SavedDatasetCollisionException + SavedDatasetCollisionException, + MissingProjectMetadataException ) + TimeDependentObject = Union[ BaseFeatureView, FeatureView, @@ -193,7 +195,7 @@ def apply_entity(self, entity: Entity, project: str, commit: bool = True) -> Non entity.is_valid() key = project_key(project, entity.name) - if key in self.entities: + if key in self.entities and self.entities[key] != entity: raise EntityNameCollisionException(entity.name, project) entity_copy = copy.copy(entity) @@ -253,7 +255,7 @@ def apply_data_source(self, data_source: DataSource, project: str, commit: bool """ self._maybe_init_project_metadata(project) key = project_key(project, data_source.name) - if key in self.data_sources: + if key in self.data_sources and self.data_sources[key] != data_source: raise DataSourceRepeatNamesException(data_source.name) self.data_sources[key] = copy.copy(data_source) self._maybe_reset_proto_registry() @@ -308,7 +310,7 @@ def apply_feature_service(self, feature_service: FeatureService, project: str, c """ self._maybe_init_project_metadata(project) key = project_key(project, feature_service.name) - if key in self.feature_services: + if key in self.feature_services and self.feature_services[key] != feature_service: raise FeatureServiceNameCollisionException(service_name=feature_service.name, project=project) service_copy = copy.copy(feature_service) self.feature_services[key] = self._update_object_ts(service_copy) @@ -369,7 +371,7 @@ def apply_feature_view(self, feature_view: BaseFeatureView, project: str, commit registry = self._get_feature_view_registry(feature_view) key = project_key(project, feature_view.name) - if key in registry: + if key in registry and registry[key] != feature_view: raise ConflictingFeatureViewNames(feature_view.name) feature_view_copy = copy.copy(feature_view) registry[key] = self._update_object_ts(feature_view_copy) @@ -550,7 +552,7 @@ def apply_saved_dataset( """ self._maybe_init_project_metadata(project) key = project_key(project, saved_dataset.name) - if key in self.saved_datasets: + if key in self.saved_datasets and self.saved_datasets[key] != saved_dataset: raise SavedDatasetCollisionException(project=project, name=saved_dataset.name) saved_dataset_copy = copy.copy(saved_dataset) self.saved_datasets[key] = self._update_object_ts(saved_dataset_copy) @@ -619,7 +621,7 @@ def apply_validation_reference( """ self._maybe_init_project_metadata(project) key = project_key(project, validation_reference.name) - if key in self.validation_references: + if key in self.validation_references and self.validation_references[key] != validation_reference: raise DuplicateValidationReference(name=validation_reference.name, project=project) self.validation_references[key] = copy.copy(validation_reference) self._maybe_reset_proto_registry() @@ -678,7 +680,9 @@ def list_project_metadata(self, project: str, allow_cache: bool = False) -> List Returns: List of project metadata """ - return list_registry_dict(project=project, registry=self.project_metadata) + if project not in self.project_metadata: + raise MissingProjectMetadataException(project=project) + return [self.project_metadata[project]] def update_infra(self, infra: Infra, project: str, commit: bool = True): """ diff --git a/sdk/python/feast/repo_operations.py b/sdk/python/feast/repo_operations.py index 9e9b3edad6..4801dd244c 100644 --- a/sdk/python/feast/repo_operations.py +++ b/sdk/python/feast/repo_operations.py @@ -115,6 +115,7 @@ def parse_repo(repo_root: Path) -> RepoContents: request_feature_views=[], ) + res.entities.append(DUMMY_ENTITY) for repo_file in get_repo_files(repo_root): module_path = py_path_to_module(repo_file) module = importlib.import_module(module_path) @@ -198,8 +199,6 @@ def parse_repo(repo_root: Path) -> RepoContents: (obj is rfv) for rfv in res.request_feature_views ): res.request_feature_views.append(obj) - - res.entities.append(DUMMY_ENTITY) return res diff --git a/sdk/python/feast/stream_feature_view.py b/sdk/python/feast/stream_feature_view.py index 0042e8f046..61ce7757c3 100644 --- a/sdk/python/feast/stream_feature_view.py +++ b/sdk/python/feast/stream_feature_view.py @@ -295,7 +295,6 @@ def __copy__(self): fv = StreamFeatureView( name=self.name, schema=self.schema, - entities=self.entities, ttl=self.ttl, tags=self.tags, online=self.online, @@ -304,9 +303,10 @@ def __copy__(self): aggregations=self.aggregations, mode=self.mode, timestamp_field=self.timestamp_field, - source=self.source, + source=self.stream_source, udf=self.udf, ) + fv.entities = self.entities fv.projection = copy.copy(self.projection) return fv diff --git a/sdk/python/feast/ui/package.json b/sdk/python/feast/ui/package.json index 22c3e8d4a4..ba2d697246 100644 --- a/sdk/python/feast/ui/package.json +++ b/sdk/python/feast/ui/package.json @@ -6,7 +6,7 @@ "@elastic/datemath": "^5.0.3", "@elastic/eui": "^55.0.1", "@emotion/react": "^11.9.0", - "@feast-dev/feast-ui": "0.33.1", + "@feast-dev/feast-ui": "0.34.1", "@testing-library/jest-dom": "^5.16.4", "@testing-library/react": "^13.2.0", "@testing-library/user-event": "^13.5.0", diff --git a/sdk/python/feast/ui/yarn.lock b/sdk/python/feast/ui/yarn.lock index 729132bf3b..ec7e8a24c0 100644 --- a/sdk/python/feast/ui/yarn.lock +++ b/sdk/python/feast/ui/yarn.lock @@ -1530,10 +1530,10 @@ resolved "https://registry.affirm-stage.com/artifactory/api/npm/npm/@eslint/js/-/js-8.44.0.tgz#961a5903c74139390478bdc808bcde3fc45ab7af" integrity sha1-lhpZA8dBOTkEeL3ICLzeP8Rat68= -"@feast-dev/feast-ui@0.33.1": - version "0.33.1" - resolved "https://registry.yarnpkg.com/@feast-dev/feast-ui/-/feast-ui-0.33.1.tgz#33b1a029e6571cf8e7cba58c8f813c5cd732264a" - integrity sha512-rsbIR7QsbNyo1MJGOfUv+U5DFwYVa9uMCB/EQLps1Dgt7vznm4iew23jX/tsmGgWFYqAV+Dt09uCgJMK+xS7Mw== +"@feast-dev/feast-ui@0.34.1": + version "0.34.1" + resolved "https://registry.affirm-stage.com/artifactory/api/npm/npm/@feast-dev/feast-ui/-/feast-ui-0.34.1.tgz#b06300807868f008f3962835f860f6cc3523623f" + integrity sha1-sGMAgHho8Ajzlig1+GD2zDUjYj8= dependencies: "@elastic/datemath" "^5.0.3" "@elastic/eui" "^55.0.1" diff --git a/setup.py b/setup.py index c5d85c903a..6f4e0dd1a4 100644 --- a/setup.py +++ b/setup.py @@ -39,7 +39,7 @@ from distutils.core import setup NAME = "feast" -VERSION = "0.28+affirm147" +VERSION = "0.28+affirm155" DESCRIPTION = "Python SDK for Feast @ Affirm" URL = "https://github.com/feast-dev/feast" AUTHOR = "Feast" From 5a5bc109e5056b218f354e04238ae7415b8a1e01 Mon Sep 17 00:00:00 2001 From: Ross Briden Date: Fri, 13 Oct 2023 09:49:48 -0700 Subject: [PATCH 6/9] checkpoint progress --- pyproject.toml | 2 +- sdk/python/feast/infra/registry/memory.py | 19 +++++++++++-------- setup.py | 2 +- 3 files changed, 13 insertions(+), 10 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 290d7314d9..f9796bcd96 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "feast-affirm" -version = "0.28+affirm155" +version = "0.28+affirm158" description = "Feast - Affirm" authors = ["Francisco Arceo", "Ross Briden", "Maks Stachowiak"] readme = "README.md" diff --git a/sdk/python/feast/infra/registry/memory.py b/sdk/python/feast/infra/registry/memory.py index 6d7df92e97..d364a12f79 100644 --- a/sdk/python/feast/infra/registry/memory.py +++ b/sdk/python/feast/infra/registry/memory.py @@ -1,5 +1,4 @@ import copy -import uuid from typing import Optional, Dict, List, Any, Tuple, Union from pathlib import Path from datetime import datetime @@ -36,12 +35,10 @@ DataSourceObjectNotFoundException, SavedDatasetNotFound, DuplicateValidationReference, - MissingInfraObjectException, SavedDatasetCollisionException, MissingProjectMetadataException ) - TimeDependentObject = Union[ BaseFeatureView, FeatureView, @@ -81,13 +78,12 @@ def list_registry_dict(project: str, registry: Dict[str, FeastResource]) -> List class MemoryRegistry(BaseRegistry): def __init__( self, - repo_config: Optional[RegistryConfig], + registry_config: Optional[RegistryConfig], repo_path: Optional[Path], is_feast_apply: bool = False ) -> None: # unused - self.repo_config = repo_config self.repo_path = repo_path # flag signaling that the registry has been populated; this should be set after a Feast apply operation @@ -123,6 +119,7 @@ def enter_apply_context(self): def exit_apply_context(self): self.is_feast_apply = False + self.proto() # if this flag is not set, `get_*` operations of the registry will fail; this flag is subtly different from # `is_feast_apply` in that `is_built` remains True if set at least once. self.is_built = True @@ -708,7 +705,7 @@ def get_infra(self, project: str, allow_cache: bool = False) -> Infra: The stored Infra object. """ if project not in self.infra: - raise MissingInfraObjectException(project) + return Infra() return self.infra[project] def apply_user_metadata(self, project: str, feature_view: BaseFeatureView, metadata_bytes: Optional[bytes]) -> None: @@ -723,7 +720,7 @@ def proto(self) -> RegistryProto: if self.cached_proto: return self.cached_proto - r = self.cached_proto = RegistryProto() + r = RegistryProto() for project in self.project_metadata: for lister, registry_proto_field in [ (self.list_entities, r.entities), @@ -749,6 +746,8 @@ def proto(self) -> RegistryProto: registry_proto_field.extend(registry_proto_field_data) r.infra.CopyFrom(self.get_infra(project).to_proto()) + if self.is_built: + self.cached_proto = r return r def commit(self) -> None: @@ -756,5 +755,9 @@ def commit(self) -> None: pass def refresh(self, project: Optional[str] = None) -> None: - # This is a noop because MemoryRegistry is a cache + self.proto() + if project: + self._maybe_init_project_metadata(project) + + def teardown(self) -> None: pass diff --git a/setup.py b/setup.py index 6f4e0dd1a4..23337246a1 100644 --- a/setup.py +++ b/setup.py @@ -39,7 +39,7 @@ from distutils.core import setup NAME = "feast" -VERSION = "0.28+affirm155" +VERSION = "0.28+affirm158" DESCRIPTION = "Python SDK for Feast @ Affirm" URL = "https://github.com/feast-dev/feast" AUTHOR = "Feast" From 8d4bcf830a76f2e9352b1a399a9b6862368a197f Mon Sep 17 00:00:00 2001 From: Ross Briden Date: Tue, 17 Oct 2023 21:29:51 -0700 Subject: [PATCH 7/9] add feature service copy --- sdk/python/feast/data_source.py | 65 +++++++++++++++++++++ sdk/python/feast/entity.py | 13 +++++ sdk/python/feast/feature_logging.py | 7 +++ sdk/python/feast/feature_service.py | 19 ++++++ sdk/python/feast/feature_view.py | 17 ++++-- sdk/python/feast/feature_view_projection.py | 12 +++- sdk/python/feast/field.py | 10 +++- sdk/python/feast/on_demand_feature_view.py | 9 ++- sdk/python/feast/stream_feature_view.py | 38 ++++++++---- 9 files changed, 168 insertions(+), 22 deletions(-) diff --git a/sdk/python/feast/data_source.py b/sdk/python/feast/data_source.py index b7ce19aad9..81778875e0 100644 --- a/sdk/python/feast/data_source.py +++ b/sdk/python/feast/data_source.py @@ -13,6 +13,7 @@ # limitations under the License. import enum +import copy import warnings from abc import ABC, abstractmethod from datetime import timedelta @@ -96,6 +97,14 @@ def to_proto(self) -> DataSourceProto.KafkaOptions: return kafka_options_proto + def __copy__(self): + return KafkaOptions( + kafka_bootstrap_servers=self.kafka_bootstrap_servers, + message_format=self.message_format, + topic=self.topic, + watermark_delay_threshold=self.watermark_delay_threshold + ) + class KinesisOptions: """ @@ -112,6 +121,13 @@ def __init__( self.region = region self.stream_name = stream_name + def __copy__(self): + return KinesisOptions( + record_format=copy.copy(self.record_format), + region=self.region, + stream_name=self.stream_name + ) + @classmethod def from_proto(cls, kinesis_options_proto: DataSourceProto.KinesisOptions): """ @@ -438,6 +454,22 @@ def __eq__(self, other): def __hash__(self): return super().__hash__() + def __copy__(self): + return KafkaSource( + name=self.name, + field_mapping=dict(self.field_mapping), + kafka_bootstrap_servers=self.kafka_options.kafka_bootstrap_servers, + message_format=self.kafka_options.message_format, + watermark_delay_threshold=self.kafka_options.watermark_delay_threshold, + topic=self.kafka_options.topic, + created_timestamp_column=self.created_timestamp_column, + timestamp_field=self.timestamp_field, + description=self.description, + tags=dict(self.tags), + owner=self.owner, + batch_source=copy.copy(self.batch_source) if self.batch_source else None, + ) + @staticmethod def from_proto(data_source: DataSourceProto): watermark_delay_threshold = None @@ -561,6 +593,15 @@ def __eq__(self, other): def __hash__(self): return super().__hash__() + def __copy__(self): + return RequestSource( + name=self.name, + schema=[copy.copy(field) for field in self.schema], + description=self.description, + tags=dict(self.tags), + owner=self.owner + ) + @staticmethod def from_proto(data_source: DataSourceProto): schema_pb = data_source.request_data_options.schema @@ -637,6 +678,21 @@ def from_proto(data_source: DataSourceProto): else None, ) + def __copy__(self): + return KinesisSource( + name=self.name, + timestamp_field=self.timestamp_field, + field_mapping=dict(self.field_mapping), + record_format=copy.copy(self.kinesis_options.record_format), + region=self.kinesis_options.region, + stream_name=self.kinesis_options.stream_name, + created_timestamp_column=self.created_timestamp_column, + description=self.description, + tags=dict(self.tags), + owner=self.owner, + batch_source=copy.copy(self.batch_source) if self.batch_source else None + ) + @staticmethod def source_datatype_to_feast_value_type() -> Callable[[str], ValueType]: pass @@ -771,6 +827,15 @@ def __eq__(self, other): def __hash__(self): return super().__hash__() + def __copy__(self): + return PushSource( + name=self.name, + batch_source=copy.copy(self.batch_source), + description=self.description, + tags=dict(self.tags), + owner=self.owner + ) + def validate(self, config: RepoConfig): pass diff --git a/sdk/python/feast/entity.py b/sdk/python/feast/entity.py index 30f04e9c06..34e0093a76 100644 --- a/sdk/python/feast/entity.py +++ b/sdk/python/feast/entity.py @@ -139,6 +139,19 @@ def is_valid(self): if not self.value_type: raise ValueError(f"The entity {self.name} does not have a type.") + def __copy__(self): + entity = Entity( + name=self.name, + join_keys=[self.join_key], + description=self.description, + tags=dict(self.tags), + owner=self.owner, + value_type=self.value_type + ) + entity.created_timestamp = self.created_timestamp + entity.last_updated_timestamp = self.last_updated_timestamp + return entity + @classmethod def from_proto(cls, entity_proto: EntityProto): """ diff --git a/sdk/python/feast/feature_logging.py b/sdk/python/feast/feature_logging.py index bd45c09b0a..6e14c456e2 100644 --- a/sdk/python/feast/feature_logging.py +++ b/sdk/python/feast/feature_logging.py @@ -1,4 +1,5 @@ import abc +import copy from typing import TYPE_CHECKING, Dict, Optional, Type, cast import pyarrow as pa @@ -155,6 +156,12 @@ def __init__(self, destination: LoggingDestination, sample_rate: float = 1.0): self.destination = destination self.sample_rate = sample_rate + def __copy__(self): + return LoggingConfig( + destination=copy.copy(self.destination), + sample_rate=self.sample_rate + ) + @classmethod def from_proto(cls, config_proto: LoggingConfigProto) -> Optional["LoggingConfig"]: proto_kind = cast(str, config_proto.WhichOneof("destination")) diff --git a/sdk/python/feast/feature_service.py b/sdk/python/feast/feature_service.py index c3037a55da..9dd3ce737f 100644 --- a/sdk/python/feast/feature_service.py +++ b/sdk/python/feast/feature_service.py @@ -1,3 +1,4 @@ +import copy from datetime import datetime from typing import Dict, List, Optional, Union @@ -185,6 +186,24 @@ def __eq__(self, other): return True + def __copy__(self): + fs = FeatureService( + name=self.name, + features=[], + tags=dict(self.tags), + description=self.description, + owner=self.owner, + logging_config=copy.copy(self.logging_config) + ) + fs.feature_view_projections.extend( + [ + copy.copy(projection) for projection in self.feature_view_projections + ] + ) + fs.created_timestamp = self.created_timestamp + fs.last_updated_timestamp = self.last_updated_timestamp + return fs + @classmethod def from_proto(cls, feature_service_proto: FeatureServiceProto): """ diff --git a/sdk/python/feast/feature_view.py b/sdk/python/feast/feature_view.py index fa98ea29f8..3b0d4653a6 100644 --- a/sdk/python/feast/feature_view.py +++ b/sdk/python/feast/feature_view.py @@ -220,18 +220,25 @@ def __hash__(self): def __copy__(self): fv = FeatureView( name=self.name, - ttl=self.ttl, - source=self.stream_source if self.stream_source else self.batch_source, - schema=self.schema, - tags=self.tags, + description=self.description, + tags=dict(self.tags), + owner=self.owner, online=self.online, + ttl=self.ttl, + source=copy.copy(self.stream_source if self.stream_source else self.batch_source), ) # This is deliberately set outside of the FV initialization as we do not have the Entity objects. - fv.entities = self.entities + fv.entities = list(self.entities) fv.features = copy.copy(self.features) fv.entity_columns = copy.copy(self.entity_columns) fv.projection = copy.copy(self.projection) + + fv.created_timestamp = self.created_timestamp + fv.last_updated_timestamp = self.last_updated_timestamp + + for interval in self.materialization_intervals: + fv.materialization_intervals.append(interval) return fv def __eq__(self, other): diff --git a/sdk/python/feast/feature_view_projection.py b/sdk/python/feast/feature_view_projection.py index 2960996a10..457901c101 100644 --- a/sdk/python/feast/feature_view_projection.py +++ b/sdk/python/feast/feature_view_projection.py @@ -1,3 +1,4 @@ +import copy from typing import TYPE_CHECKING, Dict, List, Optional from attr import dataclass @@ -38,6 +39,15 @@ class FeatureViewProjection: def name_to_use(self): return self.name_alias or self.name + def __copy__(self): + return FeatureViewProjection( + name=self.name, + name_alias=self.name_alias, + desired_features=self.desired_features, + features=copy.copy(self.features), + join_key_map=dict(self.join_key_map) + ) + def to_proto(self) -> FeatureViewProjectionProto: feature_reference_proto = FeatureViewProjectionProto( feature_view_name=self.name, @@ -67,7 +77,7 @@ def from_proto(proto: FeatureViewProjectionProto): def from_definition(base_feature_view: "BaseFeatureView"): return FeatureViewProjection( name=base_feature_view.name, - name_alias=None, + name_alias="", features=base_feature_view.features, desired_features=[], ) diff --git a/sdk/python/feast/field.py b/sdk/python/feast/field.py index 245bb24f52..93c641b896 100644 --- a/sdk/python/feast/field.py +++ b/sdk/python/feast/field.py @@ -67,7 +67,7 @@ def __eq__(self, other): if ( self.name != other.name - or self.dtype != other.dtype + or self.dtype.to_value_type() != other.dtype.to_value_type() or self.description != other.description or self.tags != other.tags ): @@ -86,6 +86,14 @@ def __repr__(self): def __str__(self): return f"Field(name={self.name}, dtype={self.dtype}, tags={self.tags})" + def __copy__(self): + return Field( + name=self.name, + dtype=self.dtype, + description=self.description, + tags=dict(self.tags) + ) + def to_proto(self) -> FieldProto: """Converts a Field object to its protobuf representation.""" value_type = self.dtype.to_value_type() diff --git a/sdk/python/feast/on_demand_feature_view.py b/sdk/python/feast/on_demand_feature_view.py index 3ec9974bcf..4a05b00277 100644 --- a/sdk/python/feast/on_demand_feature_view.py +++ b/sdk/python/feast/on_demand_feature_view.py @@ -151,9 +151,10 @@ def proto_class(self) -> Type[OnDemandFeatureViewProto]: def __copy__(self): fv = OnDemandFeatureView( name=self.name, - schema=self.features, - sources=list(self.source_feature_view_projections.values()) - + list(self.source_request_sources.values()), + schema=[copy.copy(feature) for feature in self.features], + sources=copy.copy( + list(self.source_feature_view_projections.values()) + list(self.source_request_sources.values()) + ), udf=self.udf, udf_string=self.udf_string, mode=self.mode, @@ -162,6 +163,8 @@ def __copy__(self): owner=self.owner, ) fv.projection = copy.copy(self.projection) + fv.created_timestamp = self.created_timestamp + fv.last_updated_timestamp = self.last_updated_timestamp return fv def __eq__(self, other): diff --git a/sdk/python/feast/stream_feature_view.py b/sdk/python/feast/stream_feature_view.py index 61ce7757c3..6def14f3f7 100644 --- a/sdk/python/feast/stream_feature_view.py +++ b/sdk/python/feast/stream_feature_view.py @@ -191,7 +191,7 @@ def to_proto(self): name=self.name, entities=self.entities, entity_columns=[field.to_proto() for field in self.entity_columns], - features=[field.to_proto() for field in self.schema], + features=[field.to_proto() for field in self.features], user_defined_function=udf_proto, description=self.description, tags=self.tags, @@ -231,15 +231,14 @@ def from_proto(cls, sfv_proto, skip_udf=False): if sfv_proto.spec.HasField("user_defined_function") else None ) + + # RB: don't pass in a schema, we'll manually set it later. stream_feature_view = cls( name=sfv_proto.spec.name, description=sfv_proto.spec.description, tags=dict(sfv_proto.spec.tags), owner=sfv_proto.spec.owner, online=sfv_proto.spec.online, - schema=[ - Field.from_proto(field_proto) for field_proto in sfv_proto.spec.features - ], ttl=( timedelta(days=0) if sfv_proto.spec.ttl.ToNanoseconds() == 0 @@ -288,26 +287,41 @@ def from_proto(cls, sfv_proto, skip_udf=False): utils.make_tzaware(interval.end_time.ToDatetime()), ) ) - return stream_feature_view def __copy__(self): fv = StreamFeatureView( name=self.name, - schema=self.schema, - ttl=self.ttl, - tags=self.tags, - online=self.online, description=self.description, + tags=dict(self.tags), owner=self.owner, - aggregations=self.aggregations, + online=self.online, + ttl=self.ttl, + source=copy.copy(self.stream_source), + aggregations=copy.copy(self.aggregations), mode=self.mode, timestamp_field=self.timestamp_field, - source=self.stream_source, udf=self.udf, + udf_string=self.udf_string ) - fv.entities = self.entities + + if self.batch_source: + fv.batch_source = copy.copy(self.batch_source) + if self.stream_source: + fv.stream_source = copy.copy(self.stream_source) + + fv.created_timestamp = self.created_timestamp + fv.last_updated_timestamp = self.last_updated_timestamp + + for interval in self.materialization_intervals: + fv.materialization_intervals.append(interval) + + fv.entity_columns = copy.copy(self.entity_columns) + fv.entities = copy.copy(self.entities) fv.projection = copy.copy(self.projection) + + # make this consistent with proto + fv.features = copy.copy(self.features) return fv From a74ee54e4138907a0a3863e23e35f80662ee10da Mon Sep 17 00:00:00 2001 From: Ross Briden Date: Tue, 17 Oct 2023 23:01:45 -0700 Subject: [PATCH 8/9] add __copy__ impl for spark sources --- sdk/python/feast/feature_store.py | 21 ++++++++++--------- .../spark_offline_store/spark_source.py | 15 +++++++++++++ .../time_dependent_spark_source.py | 15 +++++++++++++ sdk/python/feast/repo_config.py | 3 +++ 4 files changed, 44 insertions(+), 10 deletions(-) diff --git a/sdk/python/feast/feature_store.py b/sdk/python/feast/feature_store.py index c9d64f4b30..a3e872256b 100644 --- a/sdk/python/feast/feature_store.py +++ b/sdk/python/feast/feature_store.py @@ -1015,15 +1015,15 @@ def apply( tables_to_delete: List[FeatureView] = views_to_delete + sfvs_to_delete if not partial else [] # type: ignore tables_to_keep: List[FeatureView] = views_to_update + sfvs_to_update # type: ignore - self._get_provider().update_infra( - project=self.project, - tables_to_delete=tables_to_delete, - tables_to_keep=tables_to_keep, - entities_to_delete=entities_to_delete if not partial else [], - entities_to_keep=entities_to_update, - partial=partial, - ) - + if not self.config.ignore_infra_changes: + self._get_provider().update_infra( + project=self.project, + tables_to_delete=tables_to_delete, + tables_to_keep=tables_to_keep, + entities_to_delete=entities_to_delete if not partial else [], + entities_to_keep=entities_to_update, + partial=partial, + ) self._registry.commit() @log_exceptions_and_usage @@ -1036,7 +1036,8 @@ def teardown(self): entities = self.list_entities() - self._get_provider().teardown_infra(self.project, tables, entities) + if not self.config.ignore_infra_changes: + self._get_provider().teardown_infra(self.project, tables, entities) self._registry.teardown() @log_exceptions_and_usage diff --git a/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark_source.py b/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark_source.py index 801c5094ec..b2da514f3f 100644 --- a/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark_source.py +++ b/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark_source.py @@ -112,6 +112,21 @@ def file_format(self): """ return self.spark_options.file_format + def __copy__(self): + return SparkSource( + name=self.name, + field_mapping=dict(self.field_mapping), + table=self.spark_options.table, + query=self.spark_options.query, + path=self.spark_options.path, + file_format=self.spark_options.file_format, + timestamp_field=self.timestamp_field, + created_timestamp_column=self.created_timestamp_column, + description=self.description, + tags=dict(self.tags), + owner=self.owner, + ) + @staticmethod def from_proto(data_source: DataSourceProto) -> Any: from feast.infra.offline_stores.contrib.spark_offline_store.time_dependent_spark_source import ( diff --git a/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/time_dependent_spark_source.py b/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/time_dependent_spark_source.py index 8af17e4545..67b9a0171a 100644 --- a/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/time_dependent_spark_source.py +++ b/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/time_dependent_spark_source.py @@ -89,6 +89,21 @@ def __init__( owner=owner, ) + def __copy__(self): + return TimeDependentSparkSource( + name=self.name, + path_prefix=self.path_prefix, + time_fmt_str=self.time_fmt_str, + path_suffix=self.path_suffix, + file_format=self.spark_options.file_format, + created_timestamp_column=self.created_timestamp_column, + timestamp_field=self.timestamp_field, + field_mapping=dict(self.field_mapping), + tags=dict(self.tags), + owner=self.owner, + description=self.description, + ) + @property def time_dependent_path(self) -> str: return os.path.join(self.path_prefix, self.time_fmt_str, self.path_suffix) diff --git a/sdk/python/feast/repo_config.py b/sdk/python/feast/repo_config.py index 7064746b95..781db4c758 100644 --- a/sdk/python/feast/repo_config.py +++ b/sdk/python/feast/repo_config.py @@ -165,6 +165,9 @@ class RepoConfig(FeastBaseModel): coerce_tz_aware: Optional[bool] = True """ If True, coerces entity_df timestamp columns to be timezone aware (to UTC by default). """ + ignore_infra_changes: bool = False + """ Signals whether to apply infrastructure changes during Registry creation """ + def __init__(self, **data: Any): super().__init__(**data) From d16b75063a8297448ca83a25af84ed5eba9aeeb0 Mon Sep 17 00:00:00 2001 From: Ross Briden Date: Sun, 29 Oct 2023 15:12:23 -0700 Subject: [PATCH 9/9] add toml --- pyproject.toml | 2 +- setup.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index f9796bcd96..d1940b6913 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "feast-affirm" -version = "0.28+affirm158" +version = "0.28+affirm170" description = "Feast - Affirm" authors = ["Francisco Arceo", "Ross Briden", "Maks Stachowiak"] readme = "README.md" diff --git a/setup.py b/setup.py index 23337246a1..628e46d478 100644 --- a/setup.py +++ b/setup.py @@ -39,7 +39,7 @@ from distutils.core import setup NAME = "feast" -VERSION = "0.28+affirm158" +VERSION = "0.28+affirm170" DESCRIPTION = "Python SDK for Feast @ Affirm" URL = "https://github.com/feast-dev/feast" AUTHOR = "Feast"