Skip to content

Commit

Permalink
[Framework] Delete entities which didn't pass the mapping selector in…
Browse files Browse the repository at this point in the history
… register_raw (#490)

# Description

What - Delete entities which didn't pass the mapping selector in
register_raw

## Type of change

Please leave one option from the following and delete the rest:

- [ ] Bug fix (non-breaking change which fixes an issue)
- [ ] New feature (non-breaking change which adds functionality)
- [ ] New Integration (non-breaking change which adds a new integration)
- [ ] Breaking change (fix or feature that would cause existing
functionality to not work as expected)
- [X] Non-breaking change (fix of existing functionality that will not
change current behavior)
- [ ] Documentation (added/updated documentation)

## Screenshots

Include screenshots from your environment showing how the resources of
the integration will look.

## API Documentation

Provide links to the API documentation used for this integration.
  • Loading branch information
omby8888 authored Apr 17, 2024
1 parent 9eeab78 commit f3ce269
Show file tree
Hide file tree
Showing 9 changed files with 167 additions and 95 deletions.
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,13 @@ this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm

<!-- towncrier release notes start -->

## 0.5.13 (2024-04-17)

### Features

- Delete entities that doesn't passed the selector on real time events


## 0.5.12 (2024-04-12)

### Features
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ For example, if you configure the integration to listen to the `/webhook` endpoi

:::

Here is an example definition that exposes a `/integrations/webhook` route the integration will listen to:
Here is an example definition that exposes a `/integration/webhook` route the integration will listen to:

```python showLineNumbers
from port_ocean.context.ocean import ocean
Expand Down
5 changes: 2 additions & 3 deletions integrations/dynatrace/config.yaml
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
# This is an example configuration file for the integration service.
# Please copy this file to config.yaml file in the integration folder and edit it to your needs.
port:
clientId: "{{ from env PORT_CLIENT_ID }}" # Can be loaded via environment variable: PORT_CLIENT_ID
clientSecret: "{{ from env PORT_CLIENT_SECRET }}" # Can be loaded via environment variable: PORT_CLIENT_SECRET

clientId: "{{ from env PORT_CLIENT_ID }}" # Can be loaded via environment variable: PORT_CLIENT_ID
clientSecret: "{{ from env PORT_CLIENT_SECRET }}" # Can be loaded via environment variable: PORT_CLIENT_SECRET
# The event listener to use for the integration service.
eventListener:
type: POLLING
Expand Down
37 changes: 35 additions & 2 deletions port_ocean/clients/port/mixins/entities.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import asyncio
from typing import Any
from urllib.parse import quote_plus

import httpx
Expand Down Expand Up @@ -133,8 +134,10 @@ async def batch_delete_entities(
return_exceptions=True,
)

async def search_entities(self, user_agent_type: UserAgentType) -> list[Entity]:
query = {
async def search_entities(
self, user_agent_type: UserAgentType, query: dict[Any, Any] | None = None
) -> list[Entity]:
default_query = {
"combinator": "and",
"rules": [
{
Expand All @@ -150,6 +153,11 @@ async def search_entities(self, user_agent_type: UserAgentType) -> list[Entity]:
],
}

if query is None:
query = default_query
elif query.get("rules"):
query["rules"].append(default_query)

logger.info(f"Searching entities with query {query}")
response = await self.client.post(
f"{self.auth.api_url}/entities/search",
Expand All @@ -164,3 +172,28 @@ async def search_entities(self, user_agent_type: UserAgentType) -> list[Entity]:
)
handle_status_code(response)
return [Entity.parse_obj(result) for result in response.json()["entities"]]

async def does_integration_has_ownership_over_entity(
self, entity: Entity, user_agent_type: UserAgentType
) -> bool:
logger.info(f"Validating ownership on entity {entity.identifier}")
found_entities: list[Entity] = await self.search_entities(
user_agent_type,
{
"combinator": "and",
"rules": [
{
"property": "$identifier",
"operator": "contains",
"value": entity.identifier,
},
{
"property": "$blueprint",
"operator": "contains",
"value": entity.blueprint,
},
],
},
)

return len(found_entities) > 0
24 changes: 17 additions & 7 deletions port_ocean/core/handlers/entity_processor/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,10 @@
from port_ocean.core.handlers.base import BaseHandler
from port_ocean.core.handlers.port_app_config.models import ResourceConfig
from port_ocean.core.models import Entity
from port_ocean.core.ocean_types import RawEntityDiff, EntityDiff
from port_ocean.core.ocean_types import (
RawEntity,
EntitySelectorDiff,
)


@dataclass
Expand All @@ -33,21 +36,28 @@ class BaseEntityProcessor(BaseHandler):

@abstractmethod
async def _parse_items(
self, mapping: ResourceConfig, raw_data: RawEntityDiff
) -> EntityDiff:
self,
mapping: ResourceConfig,
raw_data: list[RawEntity],
parse_all: bool = False,
) -> EntitySelectorDiff:
pass

async def parse_items(
self, mapping: ResourceConfig, raw_data: RawEntityDiff
) -> EntityDiff:
self,
mapping: ResourceConfig,
raw_data: list[RawEntity],
parse_all: bool = False,
) -> EntitySelectorDiff:
"""Public method to parse raw entity data and map it to an EntityDiff.
Args:
mapping (ResourceConfig): The configuration for entity mapping.
raw_data (RawEntityDiff): The raw data to be parsed.
raw_data (list[RawEntity]): The raw data to be parsed.
parse_all (bool): Whether to parse all data or just data that passed the selector.
Returns:
EntityDiff: The parsed entity differences.
"""
with logger.contextualize(kind=mapping.kind):
return await self._parse_items(mapping, raw_data)
return await self._parse_items(mapping, raw_data, parse_all)
83 changes: 42 additions & 41 deletions port_ocean/core/handlers/entity_processor/jq_entity_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,10 @@
from port_ocean.core.handlers.entity_processor.base import BaseEntityProcessor
from port_ocean.core.handlers.port_app_config.models import ResourceConfig
from port_ocean.core.models import Entity
from port_ocean.core.ocean_types import RawEntityDiff, EntityDiff
from port_ocean.core.ocean_types import (
RawEntity,
EntitySelectorDiff,
)
from port_ocean.exceptions.core import EntityProcessorException


Expand Down Expand Up @@ -68,33 +71,38 @@ async def _search_as_object(

return result

async def _get_entity_if_passed_selector(
async def _get_mapped_entity(
self,
data: dict[str, Any],
raw_entity_mappings: dict[str, Any],
selector_query: str,
) -> dict[str, Any]:
parse_all: bool = False,
) -> tuple[dict[str, Any], bool]:
should_run = await self._search_as_bool(data, selector_query)
if should_run:
return await self._search_as_object(data, raw_entity_mappings)
return {}
if parse_all or should_run:
mapped_entity = await self._search_as_object(data, raw_entity_mappings)
return mapped_entity, should_run

return {}, False

async def _calculate_entity(
self,
data: dict[str, Any],
raw_entity_mappings: dict[str, Any],
items_to_parse: str | None,
selector_query: str,
) -> list[dict[str, Any]]:
parse_all: bool = False,
) -> list[tuple[dict[str, Any], bool]]:
if items_to_parse:
items = await self._search(data, items_to_parse)
if isinstance(items, list):
return await asyncio.gather(
*[
self._get_entity_if_passed_selector(
self._get_mapped_entity(
{"item": item, **data},
raw_entity_mappings,
selector_query,
parse_all,
)
for item in items
]
Expand All @@ -105,51 +113,44 @@ async def _calculate_entity(
)
else:
return [
await self._get_entity_if_passed_selector(
data, raw_entity_mappings, selector_query
await self._get_mapped_entity(
data, raw_entity_mappings, selector_query, parse_all
)
]
return [{}]
return [({}, False)]

async def _calculate_entities(
self, mapping: ResourceConfig, raw_data: list[dict[str, Any]]
) -> list[Entity]:
async def _parse_items(
self,
mapping: ResourceConfig,
raw_results: list[RawEntity],
parse_all: bool = False,
) -> EntitySelectorDiff:
raw_entity_mappings: dict[str, Any] = mapping.port.entity.mappings.dict(
exclude_unset=True
)
entities_tasks = [
calculate_entities_tasks = [
asyncio.create_task(
self._calculate_entity(
data,
raw_entity_mappings,
mapping.port.items_to_parse,
mapping.selector.query,
parse_all,
)
)
for data in raw_data
]
entities = await asyncio.gather(*entities_tasks)

return [
Entity.parse_obj(entity_data)
for flatten in entities
for entity_data in filter(
lambda entity: entity.get("identifier") and entity.get("blueprint"),
flatten,
)
for data in raw_results
]

async def _parse_items(
self, mapping: ResourceConfig, raw_results: RawEntityDiff
) -> EntityDiff:
entities_before: list[Entity] = await self._calculate_entities(
mapping, raw_results["before"]
)
entities_after: list[Entity] = await self._calculate_entities(
mapping, raw_results["after"]
)

return {
"before": entities_before,
"after": entities_after,
}
calculate_entities_results = await asyncio.gather(*calculate_entities_tasks)

passed_entities = []
failed_entities = []
for entities_results in calculate_entities_results:
for entity, did_entity_pass_selector in entities_results:
if entity.get("identifier") and entity.get("blueprint"):
parsed_entity = Entity.parse_obj(entity)
if did_entity_pass_selector:
passed_entities.append(parsed_entity)
else:
failed_entities.append(parsed_entity)

return {"passed": passed_entities, "failed": failed_entities}
Loading

0 comments on commit f3ce269

Please sign in to comment.