From da1e9fdd30b01f9b9ec956b2a3ae2c408bf524ad Mon Sep 17 00:00:00 2001 From: yair Date: Thu, 10 Oct 2024 15:55:49 +0300 Subject: [PATCH] process pool executor --- port_ocean/clients/port/mixins/entities.py | 79 ++++++++------- .../entities_state_applier/port/applier.py | 63 ++++++------ .../core/integrations/mixins/sync_raw.py | 95 ++++++++++--------- pyproject.toml | 2 +- 4 files changed, 119 insertions(+), 120 deletions(-) diff --git a/port_ocean/clients/port/mixins/entities.py b/port_ocean/clients/port/mixins/entities.py index 6d3659dfcb..f8d5f63fa3 100644 --- a/port_ocean/clients/port/mixins/entities.py +++ b/port_ocean/clients/port/mixins/entities.py @@ -56,33 +56,33 @@ async def upsert_entity( f"entity: {entity.identifier} of " f"blueprint: {entity.blueprint}" ) - # handle_status_code(response, should_raise) - # result = response.json() - # - # result_entity = ( - # Entity.parse_obj(result["entity"]) if result.get("entity") else entity - # ) - # - # # Happens when upsert fails and search identifier is defined. - # # We return None to ignore the entity later in the delete process - # if result_entity.is_using_search_identifier: - # return None - # - # # In order to save memory we'll keep only the identifier, blueprint and relations of the - # # upserted entity result for later calculations - # reduced_entity = Entity( - # identifier=result_entity.identifier, blueprint=result_entity.blueprint - # ) - # - # # Turning dict typed relations (raw search relations) is required - # # for us to be able to successfully calculate the participation related entities - # # and ignore the ones that don't as they weren't upserted - # reduced_entity.relations = { - # key: None if isinstance(relation, dict) else relation - # for key, relation in result_entity.relations.items() - # } - - return None + handle_status_code(response, should_raise) + result = response.json() + + result_entity = ( + Entity.parse_obj(result["entity"]) if result.get("entity") else entity + ) + + # Happens when upsert fails and search identifier is defined. + # We return None to ignore the entity later in the delete process + if result_entity.is_using_search_identifier: + return None + + # In order to save memory we'll keep only the identifier, blueprint and relations of the + # upserted entity result for later calculations + reduced_entity = Entity( + identifier=result_entity.identifier, blueprint=result_entity.blueprint + ) + + # Turning dict typed relations (raw search relations) is required + # for us to be able to successfully calculate the participation related entities + # and ignore the ones that don't as they weren't upserted + reduced_entity.relations = { + key: None if isinstance(relation, dict) else relation + for key, relation in result_entity.relations.items() + } + + return reduced_entity async def batch_upsert_entities( self, @@ -91,7 +91,7 @@ async def batch_upsert_entities( user_agent_type: UserAgentType | None = None, should_raise: bool = True, ) -> list[Entity]: - await asyncio.gather( + modified_entities_results = await asyncio.gather( *( self.upsert_entity( entity, @@ -103,18 +103,17 @@ async def batch_upsert_entities( ), return_exceptions=True, ) - return [] - # entity_results = [ - # entity for entity in modified_entities_results if isinstance(entity, Entity) - # ] - # if not should_raise: - # return entity_results - # - # for entity_result in modified_entities_results: - # if isinstance(entity_result, Exception): - # raise entity_result - # - # return entity_results + entity_results = [ + entity for entity in modified_entities_results if isinstance(entity, Entity) + ] + if not should_raise: + return entity_results + + for entity_result in modified_entities_results: + if isinstance(entity_result, Exception): + raise entity_result + + return entity_results async def delete_entity( self, diff --git a/port_ocean/core/handlers/entities_state_applier/port/applier.py b/port_ocean/core/handlers/entities_state_applier/port/applier.py index 1badaf5022..c1ab47dd75 100644 --- a/port_ocean/core/handlers/entities_state_applier/port/applier.py +++ b/port_ocean/core/handlers/entities_state_applier/port/applier.py @@ -97,38 +97,37 @@ async def upsert( self, entities: list[Entity], user_agent_type: UserAgentType ) -> list[Entity]: logger.info(f"Upserting {len(entities)} entities") - # modified_entities: list[Entity] = [] - # if event.port_app_config.create_missing_related_entities: - await self.context.port_client.batch_upsert_entities( - entities, - event.port_app_config.get_port_request_options(), - user_agent_type, - should_raise=False, - ) - # else: - # entities_with_search_identifier: list[Entity] = [] - # entities_without_search_identifier: list[Entity] = [] - # for entity in entities: - # if entity.is_using_search_identifier: - # entities_with_search_identifier.append(entity) - # else: - # entities_without_search_identifier.append(entity) - # - # ordered_created_entities = reversed( - # entities_with_search_identifier - # + order_by_entities_dependencies(entities_without_search_identifier) - # ) - # for entity in ordered_created_entities: - # upsertedEntity = await self.context.port_client.upsert_entity( - # entity, - # event.port_app_config.get_port_request_options(), - # user_agent_type, - # should_raise=False, - # ) - # if upsertedEntity: - # modified_entities.append(upsertedEntity) - # return modified_entities - return [] + modified_entities: list[Entity] = [] + if event.port_app_config.create_missing_related_entities: + modified_entities = await self.context.port_client.batch_upsert_entities( + entities, + event.port_app_config.get_port_request_options(), + user_agent_type, + should_raise=False, + ) + else: + entities_with_search_identifier: list[Entity] = [] + entities_without_search_identifier: list[Entity] = [] + for entity in entities: + if entity.is_using_search_identifier: + entities_with_search_identifier.append(entity) + else: + entities_without_search_identifier.append(entity) + + ordered_created_entities = reversed( + entities_with_search_identifier + + order_by_entities_dependencies(entities_without_search_identifier) + ) + for entity in ordered_created_entities: + upsertedEntity = await self.context.port_client.upsert_entity( + entity, + event.port_app_config.get_port_request_options(), + user_agent_type, + should_raise=False, + ) + if upsertedEntity: + modified_entities.append(upsertedEntity) + return modified_entities async def delete( self, entities: list[Entity], user_agent_type: UserAgentType diff --git a/port_ocean/core/integrations/mixins/sync_raw.py b/port_ocean/core/integrations/mixins/sync_raw.py index afbf71c255..86821db6e8 100644 --- a/port_ocean/core/integrations/mixins/sync_raw.py +++ b/port_ocean/core/integrations/mixins/sync_raw.py @@ -3,6 +3,7 @@ import typing from typing import Callable, Awaitable, Any +import httpx from loguru import logger from port_ocean.clients.port.types import UserAgentType @@ -425,20 +426,20 @@ async def sync_raw_all( use_cache=False ) logger.info(f"Resync will use the following mappings: {app_config.dict()}") - # try: - # did_fetched_current_state = True - # entities_at_port = await ocean.port_client.search_entities( - # user_agent_type - # ) - # except httpx.HTTPError as e: - # logger.warning( - # "Failed to fetch the current state of entities at Port. " - # "Skipping delete phase due to unknown initial state. " - # f"Error: {e}\n" - # f"Response status code: {e.response.status_code if isinstance(e, httpx.HTTPStatusError) else None}\n" - # f"Response content: {e.response.text if isinstance(e, httpx.HTTPStatusError) else None}\n" - # ) - # did_fetched_current_state = False + try: + did_fetched_current_state = True + entities_at_port = await ocean.port_client.search_entities( + user_agent_type + ) + except httpx.HTTPError as e: + logger.warning( + "Failed to fetch the current state of entities at Port. " + "Skipping delete phase due to unknown initial state. " + f"Error: {e}\n" + f"Response status code: {e.response.status_code if isinstance(e, httpx.HTTPStatusError) else None}\n" + f"Response content: {e.response.text if isinstance(e, httpx.HTTPStatusError) else None}\n" + ) + did_fetched_current_state = False creation_results: list[tuple[list[Entity], list[Exception]]] = [] @@ -457,36 +458,36 @@ async def sync_raw_all( except asyncio.CancelledError as e: logger.warning("Resync aborted successfully, skipping delete phase. This leads to an incomplete state") raise - # else: - # if not did_fetched_current_state: - # logger.warning( - # "Due to an error before the resync, the previous state of entities at Port is unknown." - # " Skipping delete phase due to unknown initial state." - # ) - # return - # - # logger.info("Starting resync diff calculation") - # flat_created_entities, errors = zip_and_sum(creation_results) or [ - # [], - # [], - # ] - # - # if errors: - # message = f"Resync failed with {len(errors)}. Skipping delete phase due to incomplete state" - # error_group = ExceptionGroup( - # f"Resync failed with {len(errors)}. Skipping delete phase due to incomplete state", - # errors, - # ) - # if not silent: - # raise error_group - # - # logger.error(message, exc_info=error_group) - # else: - # logger.info( - # f"Running resync diff calculation, number of entities at Port before resync: {len(entities_at_port)}, number of entities created during sync: {len(flat_created_entities)}" - # ) - # await self.entities_state_applier.delete_diff( - # {"before": entities_at_port, "after": flat_created_entities}, - # user_agent_type, - # ) - # logger.info("Resync finished successfully") + else: + if not did_fetched_current_state: + logger.warning( + "Due to an error before the resync, the previous state of entities at Port is unknown." + " Skipping delete phase due to unknown initial state." + ) + return + + logger.info("Starting resync diff calculation") + flat_created_entities, errors = zip_and_sum(creation_results) or [ + [], + [], + ] + + if errors: + message = f"Resync failed with {len(errors)}. Skipping delete phase due to incomplete state" + error_group = ExceptionGroup( + f"Resync failed with {len(errors)}. Skipping delete phase due to incomplete state", + errors, + ) + if not silent: + raise error_group + + logger.error(message, exc_info=error_group) + else: + logger.info( + f"Running resync diff calculation, number of entities at Port before resync: {len(entities_at_port)}, number of entities created during sync: {len(flat_created_entities)}" + ) + await self.entities_state_applier.delete_diff( + {"before": entities_at_port, "after": flat_created_entities}, + user_agent_type, + ) + logger.info("Resync finished successfully") diff --git a/pyproject.toml b/pyproject.toml index 60a556be89..8246a22fa2 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "port-ocean" -version = "0.12.2-dev05" +version = "0.12.2-dev06" description = "Port Ocean is a CLI tool for managing your Port projects." readme = "README.md" homepage = "https://app.getport.io"