diff --git a/CHANGELOG.md b/CHANGELOG.md index 7d7c27decd..14d5a70a0e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,13 @@ this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm +## 0.5.13 (2024-04-17) + +### Features + +- Delete entities that doesn't passed the selector on real time events + + ## 0.5.12 (2024-04-12) ### Features diff --git a/docs/framework-guides/docs/framework/features/live-events.md b/docs/framework-guides/docs/framework/features/live-events.md index 7c6f9a3e0d..7c904f20a4 100644 --- a/docs/framework-guides/docs/framework/features/live-events.md +++ b/docs/framework-guides/docs/framework/features/live-events.md @@ -58,7 +58,7 @@ For example, if you configure the integration to listen to the `/webhook` endpoi ::: -Here is an example definition that exposes a `/integrations/webhook` route the integration will listen to: +Here is an example definition that exposes a `/integration/webhook` route the integration will listen to: ```python showLineNumbers from port_ocean.context.ocean import ocean diff --git a/integrations/dynatrace/config.yaml b/integrations/dynatrace/config.yaml index 5dfe818219..781d8b46d6 100644 --- a/integrations/dynatrace/config.yaml +++ b/integrations/dynatrace/config.yaml @@ -1,9 +1,8 @@ # This is an example configuration file for the integration service. # Please copy this file to config.yaml file in the integration folder and edit it to your needs. port: - clientId: "{{ from env PORT_CLIENT_ID }}" # Can be loaded via environment variable: PORT_CLIENT_ID - clientSecret: "{{ from env PORT_CLIENT_SECRET }}" # Can be loaded via environment variable: PORT_CLIENT_SECRET - + clientId: "{{ from env PORT_CLIENT_ID }}" # Can be loaded via environment variable: PORT_CLIENT_ID + clientSecret: "{{ from env PORT_CLIENT_SECRET }}" # Can be loaded via environment variable: PORT_CLIENT_SECRET # The event listener to use for the integration service. eventListener: type: POLLING diff --git a/port_ocean/clients/port/mixins/entities.py b/port_ocean/clients/port/mixins/entities.py index eb4c82f81f..634770d5e1 100644 --- a/port_ocean/clients/port/mixins/entities.py +++ b/port_ocean/clients/port/mixins/entities.py @@ -1,4 +1,5 @@ import asyncio +from typing import Any from urllib.parse import quote_plus import httpx @@ -133,8 +134,10 @@ async def batch_delete_entities( return_exceptions=True, ) - async def search_entities(self, user_agent_type: UserAgentType) -> list[Entity]: - query = { + async def search_entities( + self, user_agent_type: UserAgentType, query: dict[Any, Any] | None = None + ) -> list[Entity]: + default_query = { "combinator": "and", "rules": [ { @@ -150,6 +153,11 @@ async def search_entities(self, user_agent_type: UserAgentType) -> list[Entity]: ], } + if query is None: + query = default_query + elif query.get("rules"): + query["rules"].append(default_query) + logger.info(f"Searching entities with query {query}") response = await self.client.post( f"{self.auth.api_url}/entities/search", @@ -164,3 +172,28 @@ async def search_entities(self, user_agent_type: UserAgentType) -> list[Entity]: ) handle_status_code(response) return [Entity.parse_obj(result) for result in response.json()["entities"]] + + async def does_integration_has_ownership_over_entity( + self, entity: Entity, user_agent_type: UserAgentType + ) -> bool: + logger.info(f"Validating ownership on entity {entity.identifier}") + found_entities: list[Entity] = await self.search_entities( + user_agent_type, + { + "combinator": "and", + "rules": [ + { + "property": "$identifier", + "operator": "contains", + "value": entity.identifier, + }, + { + "property": "$blueprint", + "operator": "contains", + "value": entity.blueprint, + }, + ], + }, + ) + + return len(found_entities) > 0 diff --git a/port_ocean/core/handlers/entity_processor/base.py b/port_ocean/core/handlers/entity_processor/base.py index 1a56415cb0..368cb9ada9 100644 --- a/port_ocean/core/handlers/entity_processor/base.py +++ b/port_ocean/core/handlers/entity_processor/base.py @@ -6,7 +6,10 @@ from port_ocean.core.handlers.base import BaseHandler from port_ocean.core.handlers.port_app_config.models import ResourceConfig from port_ocean.core.models import Entity -from port_ocean.core.ocean_types import RawEntityDiff, EntityDiff +from port_ocean.core.ocean_types import ( + RawEntity, + EntitySelectorDiff, +) @dataclass @@ -33,21 +36,28 @@ class BaseEntityProcessor(BaseHandler): @abstractmethod async def _parse_items( - self, mapping: ResourceConfig, raw_data: RawEntityDiff - ) -> EntityDiff: + self, + mapping: ResourceConfig, + raw_data: list[RawEntity], + parse_all: bool = False, + ) -> EntitySelectorDiff: pass async def parse_items( - self, mapping: ResourceConfig, raw_data: RawEntityDiff - ) -> EntityDiff: + self, + mapping: ResourceConfig, + raw_data: list[RawEntity], + parse_all: bool = False, + ) -> EntitySelectorDiff: """Public method to parse raw entity data and map it to an EntityDiff. Args: mapping (ResourceConfig): The configuration for entity mapping. - raw_data (RawEntityDiff): The raw data to be parsed. + raw_data (list[RawEntity]): The raw data to be parsed. + parse_all (bool): Whether to parse all data or just data that passed the selector. Returns: EntityDiff: The parsed entity differences. """ with logger.contextualize(kind=mapping.kind): - return await self._parse_items(mapping, raw_data) + return await self._parse_items(mapping, raw_data, parse_all) diff --git a/port_ocean/core/handlers/entity_processor/jq_entity_processor.py b/port_ocean/core/handlers/entity_processor/jq_entity_processor.py index fadcfafa9f..feee1d0588 100644 --- a/port_ocean/core/handlers/entity_processor/jq_entity_processor.py +++ b/port_ocean/core/handlers/entity_processor/jq_entity_processor.py @@ -9,7 +9,10 @@ from port_ocean.core.handlers.entity_processor.base import BaseEntityProcessor from port_ocean.core.handlers.port_app_config.models import ResourceConfig from port_ocean.core.models import Entity -from port_ocean.core.ocean_types import RawEntityDiff, EntityDiff +from port_ocean.core.ocean_types import ( + RawEntity, + EntitySelectorDiff, +) from port_ocean.exceptions.core import EntityProcessorException @@ -68,16 +71,19 @@ async def _search_as_object( return result - async def _get_entity_if_passed_selector( + async def _get_mapped_entity( self, data: dict[str, Any], raw_entity_mappings: dict[str, Any], selector_query: str, - ) -> dict[str, Any]: + parse_all: bool = False, + ) -> tuple[dict[str, Any], bool]: should_run = await self._search_as_bool(data, selector_query) - if should_run: - return await self._search_as_object(data, raw_entity_mappings) - return {} + if parse_all or should_run: + mapped_entity = await self._search_as_object(data, raw_entity_mappings) + return mapped_entity, should_run + + return {}, False async def _calculate_entity( self, @@ -85,16 +91,18 @@ async def _calculate_entity( raw_entity_mappings: dict[str, Any], items_to_parse: str | None, selector_query: str, - ) -> list[dict[str, Any]]: + parse_all: bool = False, + ) -> list[tuple[dict[str, Any], bool]]: if items_to_parse: items = await self._search(data, items_to_parse) if isinstance(items, list): return await asyncio.gather( *[ - self._get_entity_if_passed_selector( + self._get_mapped_entity( {"item": item, **data}, raw_entity_mappings, selector_query, + parse_all, ) for item in items ] @@ -105,51 +113,44 @@ async def _calculate_entity( ) else: return [ - await self._get_entity_if_passed_selector( - data, raw_entity_mappings, selector_query + await self._get_mapped_entity( + data, raw_entity_mappings, selector_query, parse_all ) ] - return [{}] + return [({}, False)] - async def _calculate_entities( - self, mapping: ResourceConfig, raw_data: list[dict[str, Any]] - ) -> list[Entity]: + async def _parse_items( + self, + mapping: ResourceConfig, + raw_results: list[RawEntity], + parse_all: bool = False, + ) -> EntitySelectorDiff: raw_entity_mappings: dict[str, Any] = mapping.port.entity.mappings.dict( exclude_unset=True ) - entities_tasks = [ + calculate_entities_tasks = [ asyncio.create_task( self._calculate_entity( data, raw_entity_mappings, mapping.port.items_to_parse, mapping.selector.query, + parse_all, ) ) - for data in raw_data - ] - entities = await asyncio.gather(*entities_tasks) - - return [ - Entity.parse_obj(entity_data) - for flatten in entities - for entity_data in filter( - lambda entity: entity.get("identifier") and entity.get("blueprint"), - flatten, - ) + for data in raw_results ] - - async def _parse_items( - self, mapping: ResourceConfig, raw_results: RawEntityDiff - ) -> EntityDiff: - entities_before: list[Entity] = await self._calculate_entities( - mapping, raw_results["before"] - ) - entities_after: list[Entity] = await self._calculate_entities( - mapping, raw_results["after"] - ) - - return { - "before": entities_before, - "after": entities_after, - } + calculate_entities_results = await asyncio.gather(*calculate_entities_tasks) + + passed_entities = [] + failed_entities = [] + for entities_results in calculate_entities_results: + for entity, did_entity_pass_selector in entities_results: + if entity.get("identifier") and entity.get("blueprint"): + parsed_entity = Entity.parse_obj(entity) + if did_entity_pass_selector: + passed_entities.append(parsed_entity) + else: + failed_entities.append(parsed_entity) + + return {"passed": passed_entities, "failed": failed_entities} diff --git a/port_ocean/core/integrations/mixins/sync_raw.py b/port_ocean/core/integrations/mixins/sync_raw.py index 67d60f0976..f29e6d2780 100644 --- a/port_ocean/core/integrations/mixins/sync_raw.py +++ b/port_ocean/core/integrations/mixins/sync_raw.py @@ -24,6 +24,8 @@ RawEntityDiff, EntityDiff, ASYNC_GENERATOR_RESYNC_TYPE, + RawEntity, + EntitySelectorDiff, ) from port_ocean.core.utils import zip_and_sum from port_ocean.exceptions.core import OceanAbortException @@ -120,12 +122,13 @@ async def _execute_resync_tasks( return results, errors async def _calculate_raw( - self, raw_diff: list[tuple[ResourceConfig, RawEntityDiff]] - ) -> list[EntityDiff]: - logger.info("Calculating diff in entities between states") + self, + raw_diff: list[tuple[ResourceConfig, list[RawEntity]]], + parse_all: bool = False, + ) -> list[EntitySelectorDiff]: return await asyncio.gather( *( - self.entity_processor.parse_items(mapping, results) + self.entity_processor.parse_items(mapping, results, parse_all) for mapping, results in raw_diff ) ) @@ -135,42 +138,41 @@ async def _register_resource_raw( resource: ResourceConfig, results: list[dict[Any, Any]], user_agent_type: UserAgentType, + parse_all: bool = False, ) -> list[Entity]: - objects_diff = await self._calculate_raw( - [ - ( - resource, - { - "before": [], - "after": results, - }, - ) - ] - ) + objects_diff = await self._calculate_raw([(resource, results)], parse_all) - entities_after: list[Entity] = objects_diff[0]["after"] + entities_after: list[Entity] = objects_diff[0]["passed"] await self.entities_state_applier.upsert(entities_after, user_agent_type) + + # If an entity didn't pass the JQ selector, we want to delete it if it exists in Port + for entity_to_delete in objects_diff[0]["failed"]: + is_owner = ( + await ocean.port_client.does_integration_has_ownership_over_entity( + entity_to_delete, user_agent_type + ) + ) + if not is_owner: + logger.info( + f"Skipping deletion of entity {entity_to_delete.identifier}, " + f"Couldn't find an entity that's related to the current integration." + ) + continue + await self.entities_state_applier.delete( + objects_diff[0]["failed"], user_agent_type + ) + return entities_after async def _unregister_resource_raw( self, resource: ResourceConfig, - results: list[dict[Any, Any]], + results: list[RawEntity], user_agent_type: UserAgentType, ) -> list[Entity]: - objects_diff = await self._calculate_raw( - [ - ( - resource, - { - "before": results, - "after": [], - }, - ) - ] - ) + objects_diff = await self._calculate_raw([(resource, results)]) - entities_after: list[Entity] = objects_diff[0]["before"] + entities_after: list[Entity] = objects_diff[0]["passed"] await self.entities_state_applier.delete(entities_after, user_agent_type) logger.info("Finished unregistering change") return entities_after @@ -233,7 +235,7 @@ async def register_raw( return await asyncio.gather( *( - self._register_resource_raw(resource, results, user_agent_type) + self._register_resource_raw(resource, results, user_agent_type, True) for resource in resource_mappings ) ) @@ -293,19 +295,31 @@ async def update_raw_diff( with logger.contextualize(kind=kind): logger.info(f"Found {len(resource_mappings)} resources for {kind}") - objects_diff = await self._calculate_raw( - [(mapping, raw_desired_state) for mapping in resource_mappings] + entities_before = await self._calculate_raw( + [ + (mapping, raw_desired_state["before"]) + for mapping in resource_mappings + ] ) - entities_before, entities_after = zip_and_sum( - ( - (entities_change["before"], entities_change["after"]) - for entities_change in objects_diff - ) + entities_after = await self._calculate_raw( + [(mapping, raw_desired_state["after"]) for mapping in resource_mappings] ) + entities_before_flatten = [ + item + for sublist in [d["passed"] for d in entities_before] + for item in sublist + ] + entities_after_flatten = [ + item + for sublist in [d["passed"] for d in entities_after] + for item in sublist + ] + await self.entities_state_applier.apply_diff( - {"before": entities_before, "after": entities_after}, user_agent_type + {"before": entities_before_flatten, "after": entities_after_flatten}, + user_agent_type, ) async def sync_raw_all( diff --git a/port_ocean/core/ocean_types.py b/port_ocean/core/ocean_types.py index 35358d38af..5862b118d8 100644 --- a/port_ocean/core/ocean_types.py +++ b/port_ocean/core/ocean_types.py @@ -3,9 +3,12 @@ from port_ocean.core.models import Entity +RawEntity = dict[Any, Any] + + class RawEntityDiff(TypedDict): - before: list[dict[Any, Any]] - after: list[dict[Any, Any]] + before: list[RawEntity] + after: list[RawEntity] class EntityDiff(TypedDict): @@ -13,6 +16,11 @@ class EntityDiff(TypedDict): after: list[Entity] +class EntitySelectorDiff(TypedDict): + passed: list[Entity] + failed: list[Entity] + + RAW_ITEM = dict[Any, Any] RAW_RESULT = list[RAW_ITEM] ASYNC_GENERATOR_RESYNC_TYPE = AsyncIterator[RAW_RESULT] diff --git a/pyproject.toml b/pyproject.toml index 1d01cefe8a..7c2a685a98 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "port-ocean" -version = "0.5.12" +version = "0.5.13" description = "Port Ocean is a CLI tool for managing your Port projects." readme = "README.md" homepage = "https://app.getport.io"