diff --git a/port_ocean/core/integrations/mixins/sync_raw.py b/port_ocean/core/integrations/mixins/sync_raw.py index ee4c204563..346ed79687 100644 --- a/port_ocean/core/integrations/mixins/sync_raw.py +++ b/port_ocean/core/integrations/mixins/sync_raw.py @@ -3,7 +3,6 @@ import typing from typing import Callable, Awaitable, Any -import httpx from loguru import logger from port_ocean.clients.port.types import UserAgentType @@ -174,48 +173,48 @@ async def _register_in_batches( ) -> tuple[list[Entity], list[Exception]]: results, errors = await self._get_resource_raw_results(resource_config) passed_entities = [] - # async_generators: list[ASYNC_GENERATOR_RESYNC_TYPE] = [] - # raw_results: RAW_RESULT = [] - # for result in results: - # if isinstance(result, dict): - # raw_results.append(result) - # else: - # async_generators.append(result) - # - # send_raw_data_examples_amount = ( - # SEND_RAW_DATA_EXAMPLES_AMOUNT if ocean.config.send_raw_data_examples else 0 - # ) - # all_entities, register_errors = await self._register_resource_raw( - # resource_config, - # raw_results, - # user_agent_type, - # send_raw_data_examples_amount=send_raw_data_examples_amount, - # ) - # errors.extend(register_errors) - # passed_entities = list(all_entities.passed) - # - # for generator in async_generators: - # try: - # async for items in generator: - # if send_raw_data_examples_amount > 0: - # send_raw_data_examples_amount = max( - # 0, send_raw_data_examples_amount - len(passed_entities) - # ) - # - # entities, register_errors = await self._register_resource_raw( - # resource_config, - # items, - # user_agent_type, - # send_raw_data_examples_amount=send_raw_data_examples_amount, - # ) - # errors.extend(register_errors) - # passed_entities.extend(entities.passed) - # except* OceanAbortException as error: - # errors.append(error) - # - # logger.info( - # f"Finished registering change for {len(results)} raw results for kind: {resource_config.kind}. {len(passed_entities)} entities were affected" - # ) + async_generators: list[ASYNC_GENERATOR_RESYNC_TYPE] = [] + raw_results: RAW_RESULT = [] + for result in results: + if isinstance(result, dict): + raw_results.append(result) + else: + async_generators.append(result) + + send_raw_data_examples_amount = ( + SEND_RAW_DATA_EXAMPLES_AMOUNT if ocean.config.send_raw_data_examples else 0 + ) + all_entities, register_errors = await self._register_resource_raw( + resource_config, + raw_results, + user_agent_type, + send_raw_data_examples_amount=send_raw_data_examples_amount, + ) + errors.extend(register_errors) + passed_entities = list(all_entities.passed) + + for generator in async_generators: + try: + async for items in generator: + if send_raw_data_examples_amount > 0: + send_raw_data_examples_amount = max( + 0, send_raw_data_examples_amount - len(passed_entities) + ) + + # entities, register_errors = await self._register_resource_raw( + # resource_config, + # items, + # user_agent_type, + # send_raw_data_examples_amount=send_raw_data_examples_amount, + # ) + # errors.extend(register_errors) + # passed_entities.extend(entities.passed) + except* OceanAbortException as error: + errors.append(error) + + logger.info( + f"Finished registering change for {len(results)} raw results for kind: {resource_config.kind}. {len(passed_entities)} entities were affected" + ) return passed_entities, errors async def register_raw( diff --git a/pyproject.toml b/pyproject.toml index 49b82792d5..9de79fe395 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "port-ocean" -version = "0.12.2-dev07" +version = "0.12.2-dev10" description = "Port Ocean is a CLI tool for managing your Port projects." readme = "README.md" homepage = "https://app.getport.io"