Skip to content

Commit

Permalink
raw
Browse files Browse the repository at this point in the history
  • Loading branch information
yairsimantov20 committed Oct 10, 2024
1 parent 6904768 commit df12676
Show file tree
Hide file tree
Showing 2 changed files with 91 additions and 90 deletions.
179 changes: 90 additions & 89 deletions port_ocean/core/integrations/mixins/sync_raw.py
Original file line number Diff line number Diff line change
Expand Up @@ -173,48 +173,49 @@ async def _register_in_batches(
self, resource_config: ResourceConfig, user_agent_type: UserAgentType
) -> tuple[list[Entity], list[Exception]]:
results, errors = await self._get_resource_raw_results(resource_config)
async_generators: list[ASYNC_GENERATOR_RESYNC_TYPE] = []
raw_results: RAW_RESULT = []
for result in results:
if isinstance(result, dict):
raw_results.append(result)
else:
async_generators.append(result)

send_raw_data_examples_amount = (
SEND_RAW_DATA_EXAMPLES_AMOUNT if ocean.config.send_raw_data_examples else 0
)
all_entities, register_errors = await self._register_resource_raw(
resource_config,
raw_results,
user_agent_type,
send_raw_data_examples_amount=send_raw_data_examples_amount,
)
errors.extend(register_errors)
passed_entities = list(all_entities.passed)

for generator in async_generators:
try:
async for items in generator:
if send_raw_data_examples_amount > 0:
send_raw_data_examples_amount = max(
0, send_raw_data_examples_amount - len(passed_entities)
)

entities, register_errors = await self._register_resource_raw(
resource_config,
items,
user_agent_type,
send_raw_data_examples_amount=send_raw_data_examples_amount,
)
errors.extend(register_errors)
passed_entities.extend(entities.passed)
except* OceanAbortException as error:
errors.append(error)

logger.info(
f"Finished registering change for {len(results)} raw results for kind: {resource_config.kind}. {len(passed_entities)} entities were affected"
)
passed_entities = []
# async_generators: list[ASYNC_GENERATOR_RESYNC_TYPE] = []
# raw_results: RAW_RESULT = []
# for result in results:
# if isinstance(result, dict):
# raw_results.append(result)
# else:
# async_generators.append(result)
#
# send_raw_data_examples_amount = (
# SEND_RAW_DATA_EXAMPLES_AMOUNT if ocean.config.send_raw_data_examples else 0
# )
# all_entities, register_errors = await self._register_resource_raw(
# resource_config,
# raw_results,
# user_agent_type,
# send_raw_data_examples_amount=send_raw_data_examples_amount,
# )
# errors.extend(register_errors)
# passed_entities = list(all_entities.passed)
#
# for generator in async_generators:
# try:
# async for items in generator:
# if send_raw_data_examples_amount > 0:
# send_raw_data_examples_amount = max(
# 0, send_raw_data_examples_amount - len(passed_entities)
# )
#
# entities, register_errors = await self._register_resource_raw(
# resource_config,
# items,
# user_agent_type,
# send_raw_data_examples_amount=send_raw_data_examples_amount,
# )
# errors.extend(register_errors)
# passed_entities.extend(entities.passed)
# except* OceanAbortException as error:
# errors.append(error)
#
# logger.info(
# f"Finished registering change for {len(results)} raw results for kind: {resource_config.kind}. {len(passed_entities)} entities were affected"
# )
return passed_entities, errors

async def register_raw(
Expand Down Expand Up @@ -426,20 +427,20 @@ async def sync_raw_all(
use_cache=False
)
logger.info(f"Resync will use the following mappings: {app_config.dict()}")
try:
did_fetched_current_state = True
entities_at_port = await ocean.port_client.search_entities(
user_agent_type
)
except httpx.HTTPError as e:
logger.warning(
"Failed to fetch the current state of entities at Port. "
"Skipping delete phase due to unknown initial state. "
f"Error: {e}\n"
f"Response status code: {e.response.status_code if isinstance(e, httpx.HTTPStatusError) else None}\n"
f"Response content: {e.response.text if isinstance(e, httpx.HTTPStatusError) else None}\n"
)
did_fetched_current_state = False
# try:
# did_fetched_current_state = True
# entities_at_port = await ocean.port_client.search_entities(
# user_agent_type
# )
# except httpx.HTTPError as e:
# logger.warning(
# "Failed to fetch the current state of entities at Port. "
# "Skipping delete phase due to unknown initial state. "
# f"Error: {e}\n"
# f"Response status code: {e.response.status_code if isinstance(e, httpx.HTTPStatusError) else None}\n"
# f"Response content: {e.response.text if isinstance(e, httpx.HTTPStatusError) else None}\n"
# )
# did_fetched_current_state = False

creation_results: list[tuple[list[Entity], list[Exception]]] = []

Expand All @@ -458,36 +459,36 @@ async def sync_raw_all(
except asyncio.CancelledError as e:
logger.warning("Resync aborted successfully, skipping delete phase. This leads to an incomplete state")
raise
else:
if not did_fetched_current_state:
logger.warning(
"Due to an error before the resync, the previous state of entities at Port is unknown."
" Skipping delete phase due to unknown initial state."
)
return

logger.info("Starting resync diff calculation")
flat_created_entities, errors = zip_and_sum(creation_results) or [
[],
[],
]

if errors:
message = f"Resync failed with {len(errors)}. Skipping delete phase due to incomplete state"
error_group = ExceptionGroup(
f"Resync failed with {len(errors)}. Skipping delete phase due to incomplete state",
errors,
)
if not silent:
raise error_group

logger.error(message, exc_info=error_group)
else:
logger.info(
f"Running resync diff calculation, number of entities at Port before resync: {len(entities_at_port)}, number of entities created during sync: {len(flat_created_entities)}"
)
await self.entities_state_applier.delete_diff(
{"before": entities_at_port, "after": flat_created_entities},
user_agent_type,
)
logger.info("Resync finished successfully")
# else:
# if not did_fetched_current_state:
# logger.warning(
# "Due to an error before the resync, the previous state of entities at Port is unknown."
# " Skipping delete phase due to unknown initial state."
# )
# return
#
# logger.info("Starting resync diff calculation")
# flat_created_entities, errors = zip_and_sum(creation_results) or [
# [],
# [],
# ]
#
# if errors:
# message = f"Resync failed with {len(errors)}. Skipping delete phase due to incomplete state"
# error_group = ExceptionGroup(
# f"Resync failed with {len(errors)}. Skipping delete phase due to incomplete state",
# errors,
# )
# if not silent:
# raise error_group
#
# logger.error(message, exc_info=error_group)
# else:
# logger.info(
# f"Running resync diff calculation, number of entities at Port before resync: {len(entities_at_port)}, number of entities created during sync: {len(flat_created_entities)}"
# )
# await self.entities_state_applier.delete_diff(
# {"before": entities_at_port, "after": flat_created_entities},
# user_agent_type,
# )
# logger.info("Resync finished successfully")
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "port-ocean"
version = "0.12.2-dev02"
version = "0.12.2-dev03"
description = "Port Ocean is a CLI tool for managing your Port projects."
readme = "README.md"
homepage = "https://app.getport.io"
Expand Down

0 comments on commit df12676

Please sign in to comment.