diff --git a/port_ocean/core/integrations/mixins/sync_raw.py b/port_ocean/core/integrations/mixins/sync_raw.py index 87fd2a8faf..ee4c204563 100644 --- a/port_ocean/core/integrations/mixins/sync_raw.py +++ b/port_ocean/core/integrations/mixins/sync_raw.py @@ -173,48 +173,49 @@ async def _register_in_batches( self, resource_config: ResourceConfig, user_agent_type: UserAgentType ) -> tuple[list[Entity], list[Exception]]: results, errors = await self._get_resource_raw_results(resource_config) - async_generators: list[ASYNC_GENERATOR_RESYNC_TYPE] = [] - raw_results: RAW_RESULT = [] - for result in results: - if isinstance(result, dict): - raw_results.append(result) - else: - async_generators.append(result) - - send_raw_data_examples_amount = ( - SEND_RAW_DATA_EXAMPLES_AMOUNT if ocean.config.send_raw_data_examples else 0 - ) - all_entities, register_errors = await self._register_resource_raw( - resource_config, - raw_results, - user_agent_type, - send_raw_data_examples_amount=send_raw_data_examples_amount, - ) - errors.extend(register_errors) - passed_entities = list(all_entities.passed) - - for generator in async_generators: - try: - async for items in generator: - if send_raw_data_examples_amount > 0: - send_raw_data_examples_amount = max( - 0, send_raw_data_examples_amount - len(passed_entities) - ) - - entities, register_errors = await self._register_resource_raw( - resource_config, - items, - user_agent_type, - send_raw_data_examples_amount=send_raw_data_examples_amount, - ) - errors.extend(register_errors) - passed_entities.extend(entities.passed) - except* OceanAbortException as error: - errors.append(error) - - logger.info( - f"Finished registering change for {len(results)} raw results for kind: {resource_config.kind}. {len(passed_entities)} entities were affected" - ) + passed_entities = [] + # async_generators: list[ASYNC_GENERATOR_RESYNC_TYPE] = [] + # raw_results: RAW_RESULT = [] + # for result in results: + # if isinstance(result, dict): + # raw_results.append(result) + # else: + # async_generators.append(result) + # + # send_raw_data_examples_amount = ( + # SEND_RAW_DATA_EXAMPLES_AMOUNT if ocean.config.send_raw_data_examples else 0 + # ) + # all_entities, register_errors = await self._register_resource_raw( + # resource_config, + # raw_results, + # user_agent_type, + # send_raw_data_examples_amount=send_raw_data_examples_amount, + # ) + # errors.extend(register_errors) + # passed_entities = list(all_entities.passed) + # + # for generator in async_generators: + # try: + # async for items in generator: + # if send_raw_data_examples_amount > 0: + # send_raw_data_examples_amount = max( + # 0, send_raw_data_examples_amount - len(passed_entities) + # ) + # + # entities, register_errors = await self._register_resource_raw( + # resource_config, + # items, + # user_agent_type, + # send_raw_data_examples_amount=send_raw_data_examples_amount, + # ) + # errors.extend(register_errors) + # passed_entities.extend(entities.passed) + # except* OceanAbortException as error: + # errors.append(error) + # + # logger.info( + # f"Finished registering change for {len(results)} raw results for kind: {resource_config.kind}. {len(passed_entities)} entities were affected" + # ) return passed_entities, errors async def register_raw( @@ -426,20 +427,20 @@ async def sync_raw_all( use_cache=False ) logger.info(f"Resync will use the following mappings: {app_config.dict()}") - try: - did_fetched_current_state = True - entities_at_port = await ocean.port_client.search_entities( - user_agent_type - ) - except httpx.HTTPError as e: - logger.warning( - "Failed to fetch the current state of entities at Port. " - "Skipping delete phase due to unknown initial state. " - f"Error: {e}\n" - f"Response status code: {e.response.status_code if isinstance(e, httpx.HTTPStatusError) else None}\n" - f"Response content: {e.response.text if isinstance(e, httpx.HTTPStatusError) else None}\n" - ) - did_fetched_current_state = False + # try: + # did_fetched_current_state = True + # entities_at_port = await ocean.port_client.search_entities( + # user_agent_type + # ) + # except httpx.HTTPError as e: + # logger.warning( + # "Failed to fetch the current state of entities at Port. " + # "Skipping delete phase due to unknown initial state. " + # f"Error: {e}\n" + # f"Response status code: {e.response.status_code if isinstance(e, httpx.HTTPStatusError) else None}\n" + # f"Response content: {e.response.text if isinstance(e, httpx.HTTPStatusError) else None}\n" + # ) + # did_fetched_current_state = False creation_results: list[tuple[list[Entity], list[Exception]]] = [] @@ -458,36 +459,36 @@ async def sync_raw_all( except asyncio.CancelledError as e: logger.warning("Resync aborted successfully, skipping delete phase. This leads to an incomplete state") raise - else: - if not did_fetched_current_state: - logger.warning( - "Due to an error before the resync, the previous state of entities at Port is unknown." - " Skipping delete phase due to unknown initial state." - ) - return - - logger.info("Starting resync diff calculation") - flat_created_entities, errors = zip_and_sum(creation_results) or [ - [], - [], - ] - - if errors: - message = f"Resync failed with {len(errors)}. Skipping delete phase due to incomplete state" - error_group = ExceptionGroup( - f"Resync failed with {len(errors)}. Skipping delete phase due to incomplete state", - errors, - ) - if not silent: - raise error_group - - logger.error(message, exc_info=error_group) - else: - logger.info( - f"Running resync diff calculation, number of entities at Port before resync: {len(entities_at_port)}, number of entities created during sync: {len(flat_created_entities)}" - ) - await self.entities_state_applier.delete_diff( - {"before": entities_at_port, "after": flat_created_entities}, - user_agent_type, - ) - logger.info("Resync finished successfully") + # else: + # if not did_fetched_current_state: + # logger.warning( + # "Due to an error before the resync, the previous state of entities at Port is unknown." + # " Skipping delete phase due to unknown initial state." + # ) + # return + # + # logger.info("Starting resync diff calculation") + # flat_created_entities, errors = zip_and_sum(creation_results) or [ + # [], + # [], + # ] + # + # if errors: + # message = f"Resync failed with {len(errors)}. Skipping delete phase due to incomplete state" + # error_group = ExceptionGroup( + # f"Resync failed with {len(errors)}. Skipping delete phase due to incomplete state", + # errors, + # ) + # if not silent: + # raise error_group + # + # logger.error(message, exc_info=error_group) + # else: + # logger.info( + # f"Running resync diff calculation, number of entities at Port before resync: {len(entities_at_port)}, number of entities created during sync: {len(flat_created_entities)}" + # ) + # await self.entities_state_applier.delete_diff( + # {"before": entities_at_port, "after": flat_created_entities}, + # user_agent_type, + # ) + # logger.info("Resync finished successfully") diff --git a/pyproject.toml b/pyproject.toml index eb837ce00b..38811bc866 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "port-ocean" -version = "0.12.2-dev02" +version = "0.12.2-dev03" description = "Port Ocean is a CLI tool for managing your Port projects." readme = "README.md" homepage = "https://app.getport.io"