Skip to content

Commit

Permalink
process pool executor
Browse files Browse the repository at this point in the history
  • Loading branch information
yairsimantov20 committed Oct 10, 2024
1 parent 91e1777 commit da1e9fd
Show file tree
Hide file tree
Showing 4 changed files with 119 additions and 120 deletions.
79 changes: 39 additions & 40 deletions port_ocean/clients/port/mixins/entities.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,33 +56,33 @@ async def upsert_entity(
f"entity: {entity.identifier} of "
f"blueprint: {entity.blueprint}"
)
# handle_status_code(response, should_raise)
# result = response.json()
#
# result_entity = (
# Entity.parse_obj(result["entity"]) if result.get("entity") else entity
# )
#
# # Happens when upsert fails and search identifier is defined.
# # We return None to ignore the entity later in the delete process
# if result_entity.is_using_search_identifier:
# return None
#
# # In order to save memory we'll keep only the identifier, blueprint and relations of the
# # upserted entity result for later calculations
# reduced_entity = Entity(
# identifier=result_entity.identifier, blueprint=result_entity.blueprint
# )
#
# # Turning dict typed relations (raw search relations) is required
# # for us to be able to successfully calculate the participation related entities
# # and ignore the ones that don't as they weren't upserted
# reduced_entity.relations = {
# key: None if isinstance(relation, dict) else relation
# for key, relation in result_entity.relations.items()
# }

return None
handle_status_code(response, should_raise)
result = response.json()

result_entity = (
Entity.parse_obj(result["entity"]) if result.get("entity") else entity
)

# Happens when upsert fails and search identifier is defined.
# We return None to ignore the entity later in the delete process
if result_entity.is_using_search_identifier:
return None

# In order to save memory we'll keep only the identifier, blueprint and relations of the
# upserted entity result for later calculations
reduced_entity = Entity(
identifier=result_entity.identifier, blueprint=result_entity.blueprint
)

# Turning dict typed relations (raw search relations) is required
# for us to be able to successfully calculate the participation related entities
# and ignore the ones that don't as they weren't upserted
reduced_entity.relations = {
key: None if isinstance(relation, dict) else relation
for key, relation in result_entity.relations.items()
}

return reduced_entity

async def batch_upsert_entities(
self,
Expand All @@ -91,7 +91,7 @@ async def batch_upsert_entities(
user_agent_type: UserAgentType | None = None,
should_raise: bool = True,
) -> list[Entity]:
await asyncio.gather(
modified_entities_results = await asyncio.gather(
*(
self.upsert_entity(
entity,
Expand All @@ -103,18 +103,17 @@ async def batch_upsert_entities(
),
return_exceptions=True,
)
return []
# entity_results = [
# entity for entity in modified_entities_results if isinstance(entity, Entity)
# ]
# if not should_raise:
# return entity_results
#
# for entity_result in modified_entities_results:
# if isinstance(entity_result, Exception):
# raise entity_result
#
# return entity_results
entity_results = [
entity for entity in modified_entities_results if isinstance(entity, Entity)
]
if not should_raise:
return entity_results

for entity_result in modified_entities_results:
if isinstance(entity_result, Exception):
raise entity_result

return entity_results

async def delete_entity(
self,
Expand Down
63 changes: 31 additions & 32 deletions port_ocean/core/handlers/entities_state_applier/port/applier.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,38 +97,37 @@ async def upsert(
self, entities: list[Entity], user_agent_type: UserAgentType
) -> list[Entity]:
logger.info(f"Upserting {len(entities)} entities")
# modified_entities: list[Entity] = []
# if event.port_app_config.create_missing_related_entities:
await self.context.port_client.batch_upsert_entities(
entities,
event.port_app_config.get_port_request_options(),
user_agent_type,
should_raise=False,
)
# else:
# entities_with_search_identifier: list[Entity] = []
# entities_without_search_identifier: list[Entity] = []
# for entity in entities:
# if entity.is_using_search_identifier:
# entities_with_search_identifier.append(entity)
# else:
# entities_without_search_identifier.append(entity)
#
# ordered_created_entities = reversed(
# entities_with_search_identifier
# + order_by_entities_dependencies(entities_without_search_identifier)
# )
# for entity in ordered_created_entities:
# upsertedEntity = await self.context.port_client.upsert_entity(
# entity,
# event.port_app_config.get_port_request_options(),
# user_agent_type,
# should_raise=False,
# )
# if upsertedEntity:
# modified_entities.append(upsertedEntity)
# return modified_entities
return []
modified_entities: list[Entity] = []
if event.port_app_config.create_missing_related_entities:
modified_entities = await self.context.port_client.batch_upsert_entities(
entities,
event.port_app_config.get_port_request_options(),
user_agent_type,
should_raise=False,
)
else:
entities_with_search_identifier: list[Entity] = []
entities_without_search_identifier: list[Entity] = []
for entity in entities:
if entity.is_using_search_identifier:
entities_with_search_identifier.append(entity)
else:
entities_without_search_identifier.append(entity)

ordered_created_entities = reversed(
entities_with_search_identifier
+ order_by_entities_dependencies(entities_without_search_identifier)
)
for entity in ordered_created_entities:
upsertedEntity = await self.context.port_client.upsert_entity(
entity,
event.port_app_config.get_port_request_options(),
user_agent_type,
should_raise=False,
)
if upsertedEntity:
modified_entities.append(upsertedEntity)
return modified_entities

async def delete(
self, entities: list[Entity], user_agent_type: UserAgentType
Expand Down
95 changes: 48 additions & 47 deletions port_ocean/core/integrations/mixins/sync_raw.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import typing
from typing import Callable, Awaitable, Any

import httpx
from loguru import logger

from port_ocean.clients.port.types import UserAgentType
Expand Down Expand Up @@ -425,20 +426,20 @@ async def sync_raw_all(
use_cache=False
)
logger.info(f"Resync will use the following mappings: {app_config.dict()}")
# try:
# did_fetched_current_state = True
# entities_at_port = await ocean.port_client.search_entities(
# user_agent_type
# )
# except httpx.HTTPError as e:
# logger.warning(
# "Failed to fetch the current state of entities at Port. "
# "Skipping delete phase due to unknown initial state. "
# f"Error: {e}\n"
# f"Response status code: {e.response.status_code if isinstance(e, httpx.HTTPStatusError) else None}\n"
# f"Response content: {e.response.text if isinstance(e, httpx.HTTPStatusError) else None}\n"
# )
# did_fetched_current_state = False
try:
did_fetched_current_state = True
entities_at_port = await ocean.port_client.search_entities(
user_agent_type
)
except httpx.HTTPError as e:
logger.warning(
"Failed to fetch the current state of entities at Port. "
"Skipping delete phase due to unknown initial state. "
f"Error: {e}\n"
f"Response status code: {e.response.status_code if isinstance(e, httpx.HTTPStatusError) else None}\n"
f"Response content: {e.response.text if isinstance(e, httpx.HTTPStatusError) else None}\n"
)
did_fetched_current_state = False

creation_results: list[tuple[list[Entity], list[Exception]]] = []

Expand All @@ -457,36 +458,36 @@ async def sync_raw_all(
except asyncio.CancelledError as e:
logger.warning("Resync aborted successfully, skipping delete phase. This leads to an incomplete state")
raise
# else:
# if not did_fetched_current_state:
# logger.warning(
# "Due to an error before the resync, the previous state of entities at Port is unknown."
# " Skipping delete phase due to unknown initial state."
# )
# return
#
# logger.info("Starting resync diff calculation")
# flat_created_entities, errors = zip_and_sum(creation_results) or [
# [],
# [],
# ]
#
# if errors:
# message = f"Resync failed with {len(errors)}. Skipping delete phase due to incomplete state"
# error_group = ExceptionGroup(
# f"Resync failed with {len(errors)}. Skipping delete phase due to incomplete state",
# errors,
# )
# if not silent:
# raise error_group
#
# logger.error(message, exc_info=error_group)
# else:
# logger.info(
# f"Running resync diff calculation, number of entities at Port before resync: {len(entities_at_port)}, number of entities created during sync: {len(flat_created_entities)}"
# )
# await self.entities_state_applier.delete_diff(
# {"before": entities_at_port, "after": flat_created_entities},
# user_agent_type,
# )
# logger.info("Resync finished successfully")
else:
if not did_fetched_current_state:
logger.warning(
"Due to an error before the resync, the previous state of entities at Port is unknown."
" Skipping delete phase due to unknown initial state."
)
return

logger.info("Starting resync diff calculation")
flat_created_entities, errors = zip_and_sum(creation_results) or [
[],
[],
]

if errors:
message = f"Resync failed with {len(errors)}. Skipping delete phase due to incomplete state"
error_group = ExceptionGroup(
f"Resync failed with {len(errors)}. Skipping delete phase due to incomplete state",
errors,
)
if not silent:
raise error_group

logger.error(message, exc_info=error_group)
else:
logger.info(
f"Running resync diff calculation, number of entities at Port before resync: {len(entities_at_port)}, number of entities created during sync: {len(flat_created_entities)}"
)
await self.entities_state_applier.delete_diff(
{"before": entities_at_port, "after": flat_created_entities},
user_agent_type,
)
logger.info("Resync finished successfully")
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "port-ocean"
version = "0.12.2-dev05"
version = "0.12.2-dev06"
description = "Port Ocean is a CLI tool for managing your Port projects."
readme = "README.md"
homepage = "https://app.getport.io"
Expand Down

0 comments on commit da1e9fd

Please sign in to comment.