From 291a4c34b120f7f2b87ad02ef7e3608d6bea2773 Mon Sep 17 00:00:00 2001 From: Iyanuoluwa Adebayo Date: Wed, 8 Jan 2025 11:09:05 +0100 Subject: [PATCH 01/10] Added `ProjectV3GetRequestsPerMinutePerProject` quota limit for project --- integrations/gcp/CHANGELOG.md | 8 +++++ .../gcp_core/helpers/ratelimiter/overrides.py | 9 +++++ .../gcp/gcp_core/search/resource_searches.py | 35 ++++++++++++------- integrations/gcp/gcp_core/utils.py | 14 ++++++++ integrations/gcp/main.py | 6 ++-- integrations/gcp/pyproject.toml | 2 +- 6 files changed, 58 insertions(+), 16 deletions(-) diff --git a/integrations/gcp/CHANGELOG.md b/integrations/gcp/CHANGELOG.md index 2927ad8dbf..6ad93b6c0f 100644 --- a/integrations/gcp/CHANGELOG.md +++ b/integrations/gcp/CHANGELOG.md @@ -7,6 +7,14 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 +## 0.1.89 (2025-01-08) + + +### Improvements + +- Added rate limiting support for `ProjectsV3GetRequestsPerMinutePerProject` to handle GCP project quota limits during resyncs and real-time event processing + + ## 0.1.88 (2025-01-07) diff --git a/integrations/gcp/gcp_core/helpers/ratelimiter/overrides.py b/integrations/gcp/gcp_core/helpers/ratelimiter/overrides.py index 4f3d59442a..5aef0c3bff 100644 --- a/integrations/gcp/gcp_core/helpers/ratelimiter/overrides.py +++ b/integrations/gcp/gcp_core/helpers/ratelimiter/overrides.py @@ -12,6 +12,10 @@ class PubSubAPI(ResourceBoundedSemaphore): service = "pubsub.googleapis.com" +class CloudResourceManagerAPI(ResourceBoundedSemaphore): + service = "cloudresourcemanager.googleapis.com" + + class SearchAllResourcesQpmPerProject(CloudAssetAPI): quota_id = "apiSearchAllResourcesQpmPerProject" container_type = ContainerType.PROJECT @@ -20,3 +24,8 @@ class SearchAllResourcesQpmPerProject(CloudAssetAPI): class PubSubAdministratorPerMinutePerProject(PubSubAPI): quota_id = "administratorPerMinutePerProject" container_type = ContainerType.PROJECT + + +class ProjectsRequestsPerMinutePerProject(CloudResourceManagerAPI): + quota_id = "ProjectV3GetRequestsPerMinutePerProject" + container_type = ContainerType.PROJECT diff --git a/integrations/gcp/gcp_core/search/resource_searches.py b/integrations/gcp/gcp_core/search/resource_searches.py index 6fab7b5360..7eb97fa6e3 100644 --- a/integrations/gcp/gcp_core/search/resource_searches.py +++ b/integrations/gcp/gcp_core/search/resource_searches.py @@ -13,6 +13,8 @@ from google.pubsub_v1.services.publisher import PublisherAsyncClient from google.pubsub_v1.services.subscriber import SubscriberAsyncClient from loguru import logger + +from gcp_core.utils import resolve_request_controllers from port_ocean.core.ocean_types import ASYNC_GENERATOR_RESYNC_TYPE, RAW_ITEM from port_ocean.utils.cache import cache_iterator_result from gcp_core.errors import ResourceNotFoundError @@ -177,7 +179,7 @@ async def list_all_subscriptions_per_project( @cache_iterator_result() -async def search_all_projects() -> ASYNC_GENERATOR_RESYNC_TYPE: +async def search_all_projects( **kwargs: Any) -> ASYNC_GENERATOR_RESYNC_TYPE: logger.info("Searching projects") async with ProjectsAsyncClient() as projects_client: async for projects in paginated_query( @@ -185,6 +187,7 @@ async def search_all_projects() -> ASYNC_GENERATOR_RESYNC_TYPE: "search_projects", {}, lambda response: parse_protobuf_messages(response.projects), + kwargs.get("rate_limiter"), ): yield projects @@ -214,15 +217,20 @@ async def search_all_organizations() -> ASYNC_GENERATOR_RESYNC_TYPE: async def get_single_project( - project_name: str, config: Optional[ProtoConfig] = None + project_name: str, config: Optional[ProtoConfig] = None, **kwargs: Any ) -> RAW_ITEM: async with ProjectsAsyncClient() as projects_client: - return parse_protobuf_message( - await projects_client.get_project( - name=project_name, timeout=DEFAULT_REQUEST_TIMEOUT - ), - config, - ) + rate_limiter = kwargs.get("rate_limiter") + async with rate_limiter: + logger.info( + f"Executing get_single_project. Current rate limit: {rate_limiter.max_rate} requests per {rate_limiter.time_period} seconds." + ) + return parse_protobuf_message( + await projects_client.get_project( + name=project_name, timeout=DEFAULT_REQUEST_TIMEOUT + ), + config, + ) async def get_single_folder( @@ -311,22 +319,23 @@ async def feed_event_to_resource( config: Optional[ProtoConfig] = None, ) -> RAW_ITEM: resource = None + live_event_projects_rate_limiter, _ = await resolve_request_controllers(asset_type) if asset_data.get("deleted") is True: resource = asset_data["priorAsset"]["resource"]["data"] - resource[EXTRA_PROJECT_FIELD] = await get_single_project(project_id, config) + resource[EXTRA_PROJECT_FIELD] = await get_single_project(project_id, config, rate_limiter=live_event_projects_rate_limiter) else: match asset_type: case AssetTypesWithSpecialHandling.TOPIC: topic_name = asset_name.replace("//pubsub.googleapis.com/", "") resource = await get_single_topic(topic_name, config) resource[EXTRA_PROJECT_FIELD] = await get_single_project( - project_id, config + project_id, config, rate_limiter=live_event_projects_rate_limiter ) case AssetTypesWithSpecialHandling.SUBSCRIPTION: topic_name = asset_name.replace("//pubsub.googleapis.com/", "") resource = await get_single_subscription(topic_name, config) resource[EXTRA_PROJECT_FIELD] = await get_single_project( - project_id, config + project_id, config, rate_limiter=live_event_projects_rate_limiter ) case AssetTypesWithSpecialHandling.FOLDER: folder_id = asset_name.replace( @@ -339,10 +348,10 @@ async def feed_event_to_resource( ) resource = await get_single_organization(organization_id, config) case AssetTypesWithSpecialHandling.PROJECT: - resource = await get_single_project(project_id, config) + resource = await get_single_project(project_id, config, rate_limiter=live_event_projects_rate_limiter) case _: resource = asset_data["asset"]["resource"]["data"] resource[EXTRA_PROJECT_FIELD] = await get_single_project( - project_id, config + project_id, config, rate_limiter=live_event_projects_rate_limiter ) return resource diff --git a/integrations/gcp/gcp_core/utils.py b/integrations/gcp/gcp_core/utils.py index 422699c6fb..c500d5183d 100644 --- a/integrations/gcp/gcp_core/utils.py +++ b/integrations/gcp/gcp_core/utils.py @@ -17,10 +17,12 @@ from gcp_core.helpers.ratelimiter.overrides import ( SearchAllResourcesQpmPerProject, PubSubAdministratorPerMinutePerProject, + ProjectsRequestsPerMinutePerProject, ) search_all_resources_qpm_per_project = SearchAllResourcesQpmPerProject() pubsub_administrator_per_minute_per_project = PubSubAdministratorPerMinutePerProject() +project_get_requests_per_minute_per_project = ProjectsRequestsPerMinutePerProject() EXTRA_PROJECT_FIELD = "__project" DEFAULT_CREDENTIALS_FILE_PATH = ( @@ -185,6 +187,18 @@ async def get_quotas_for_project( ) -> Tuple["AsyncLimiter", "BoundedSemaphore"]: try: match kind: + case AssetTypesWithSpecialHandling.PROJECT: + project_rate_limiter = ( + await project_get_requests_per_minute_per_project.limiter( + project_id + ) + ) + project_semaphore = ( + await project_get_requests_per_minute_per_project.semaphore( + project_id + ) + ) + return project_rate_limiter, project_semaphore case ( AssetTypesWithSpecialHandling.TOPIC | AssetTypesWithSpecialHandling.SUBSCRIPTION diff --git a/integrations/gcp/main.py b/integrations/gcp/main.py index 482c6bf928..cfc11c59e4 100644 --- a/integrations/gcp/main.py +++ b/integrations/gcp/main.py @@ -62,7 +62,8 @@ async def _resolve_resync_method_for_resource( case AssetTypesWithSpecialHandling.ORGANIZATION: return search_all_organizations() case AssetTypesWithSpecialHandling.PROJECT: - return search_all_projects() + project_rate_limiter, _ = await resolve_request_controllers(kind) + return search_all_projects(rate_limiter=project_rate_limiter) case _: asset_rate_limiter, asset_semaphore = await resolve_request_controllers( kind @@ -106,7 +107,8 @@ async def resync_organizations(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE: @ocean.on_resync(kind=AssetTypesWithSpecialHandling.PROJECT) async def resync_projects(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE: - async for batch in search_all_projects(): + resync_projects_rate_limiter, _ = await resolve_request_controllers(kind) + async for batch in search_all_projects(rate_limiter=resync_projects_rate_limiter): yield batch diff --git a/integrations/gcp/pyproject.toml b/integrations/gcp/pyproject.toml index b038488de4..4a5bdd87cf 100644 --- a/integrations/gcp/pyproject.toml +++ b/integrations/gcp/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "gcp" -version = "0.1.88" +version = "0.1.89" description = "A GCP ocean integration" authors = ["Matan Geva "] From 07a7777388c4c66e1ed6081da86a2ded01a38314 Mon Sep 17 00:00:00 2001 From: Iyanuoluwa Adebayo Date: Wed, 8 Jan 2025 11:17:03 +0100 Subject: [PATCH 02/10] Update resource_searches.py --- .../gcp/gcp_core/search/resource_searches.py | 26 ++++++++++++++----- 1 file changed, 19 insertions(+), 7 deletions(-) diff --git a/integrations/gcp/gcp_core/search/resource_searches.py b/integrations/gcp/gcp_core/search/resource_searches.py index 7eb97fa6e3..0b543b5fcd 100644 --- a/integrations/gcp/gcp_core/search/resource_searches.py +++ b/integrations/gcp/gcp_core/search/resource_searches.py @@ -179,7 +179,7 @@ async def list_all_subscriptions_per_project( @cache_iterator_result() -async def search_all_projects( **kwargs: Any) -> ASYNC_GENERATOR_RESYNC_TYPE: +async def search_all_projects(**kwargs: Any) -> ASYNC_GENERATOR_RESYNC_TYPE: logger.info("Searching projects") async with ProjectsAsyncClient() as projects_client: async for projects in paginated_query( @@ -221,10 +221,18 @@ async def get_single_project( ) -> RAW_ITEM: async with ProjectsAsyncClient() as projects_client: rate_limiter = kwargs.get("rate_limiter") - async with rate_limiter: - logger.info( - f"Executing get_single_project. Current rate limit: {rate_limiter.max_rate} requests per {rate_limiter.time_period} seconds." - ) + if rate_limiter: + async with rate_limiter: + logger.info( + f"Executing get_single_project. Current rate limit: {rate_limiter.max_rate} requests per {rate_limiter.time_period} seconds." + ) + return parse_protobuf_message( + await projects_client.get_project( + name=project_name, timeout=DEFAULT_REQUEST_TIMEOUT + ), + config, + ) + else: return parse_protobuf_message( await projects_client.get_project( name=project_name, timeout=DEFAULT_REQUEST_TIMEOUT @@ -322,7 +330,9 @@ async def feed_event_to_resource( live_event_projects_rate_limiter, _ = await resolve_request_controllers(asset_type) if asset_data.get("deleted") is True: resource = asset_data["priorAsset"]["resource"]["data"] - resource[EXTRA_PROJECT_FIELD] = await get_single_project(project_id, config, rate_limiter=live_event_projects_rate_limiter) + resource[EXTRA_PROJECT_FIELD] = await get_single_project( + project_id, config, rate_limiter=live_event_projects_rate_limiter + ) else: match asset_type: case AssetTypesWithSpecialHandling.TOPIC: @@ -348,7 +358,9 @@ async def feed_event_to_resource( ) resource = await get_single_organization(organization_id, config) case AssetTypesWithSpecialHandling.PROJECT: - resource = await get_single_project(project_id, config, rate_limiter=live_event_projects_rate_limiter) + resource = await get_single_project( + project_id, config, rate_limiter=live_event_projects_rate_limiter + ) case _: resource = asset_data["asset"]["resource"]["data"] resource[EXTRA_PROJECT_FIELD] = await get_single_project( From ba22e2e40c45f0e0cf9a657ef7160701c454b291 Mon Sep 17 00:00:00 2001 From: Iyanuoluwa Adebayo Date: Thu, 9 Jan 2025 10:42:57 +0100 Subject: [PATCH 03/10] Added quota handling support for search project --- integrations/gcp/CHANGELOG.md | 2 +- .../gcp_core/helpers/ratelimiter/overrides.py | 7 ++- .../gcp/gcp_core/search/resource_searches.py | 2 +- integrations/gcp/gcp_core/utils.py | 44 +++++++++++++------ integrations/gcp/main.py | 4 +- 5 files changed, 40 insertions(+), 19 deletions(-) diff --git a/integrations/gcp/CHANGELOG.md b/integrations/gcp/CHANGELOG.md index 6ad93b6c0f..7c5a21e3f3 100644 --- a/integrations/gcp/CHANGELOG.md +++ b/integrations/gcp/CHANGELOG.md @@ -12,7 +12,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Improvements -- Added rate limiting support for `ProjectsV3GetRequestsPerMinutePerProject` to handle GCP project quota limits during resyncs and real-time event processing +- Added rate limiting support for `ProjectsV3GetRequestsPerMinutePerProject` and `ProjectV3SearchRequestsPerMinutePerProject` to handle GCP project quota limits during resyncs and real-time event processing ## 0.1.88 (2025-01-07) diff --git a/integrations/gcp/gcp_core/helpers/ratelimiter/overrides.py b/integrations/gcp/gcp_core/helpers/ratelimiter/overrides.py index 5aef0c3bff..b9c1ec8ecf 100644 --- a/integrations/gcp/gcp_core/helpers/ratelimiter/overrides.py +++ b/integrations/gcp/gcp_core/helpers/ratelimiter/overrides.py @@ -26,6 +26,11 @@ class PubSubAdministratorPerMinutePerProject(PubSubAPI): container_type = ContainerType.PROJECT -class ProjectsRequestsPerMinutePerProject(CloudResourceManagerAPI): +class ProjectGetRequestsPerMinutePerProject(CloudResourceManagerAPI): quota_id = "ProjectV3GetRequestsPerMinutePerProject" container_type = ContainerType.PROJECT + + +class ProjectSearchRequestsPerMinutePerProject(CloudResourceManagerAPI): + quota_id = "ProjectV3SearchRequestsPerMinutePerProject" + container_type = ContainerType.PROJECT diff --git a/integrations/gcp/gcp_core/search/resource_searches.py b/integrations/gcp/gcp_core/search/resource_searches.py index 0b543b5fcd..2a69754f21 100644 --- a/integrations/gcp/gcp_core/search/resource_searches.py +++ b/integrations/gcp/gcp_core/search/resource_searches.py @@ -327,7 +327,7 @@ async def feed_event_to_resource( config: Optional[ProtoConfig] = None, ) -> RAW_ITEM: resource = None - live_event_projects_rate_limiter, _ = await resolve_request_controllers(asset_type) + live_event_projects_rate_limiter, _ = await resolve_request_controllers(AssetTypesWithSpecialHandling.PROJECT, method="get") if asset_data.get("deleted") is True: resource = asset_data["priorAsset"]["resource"]["data"] resource[EXTRA_PROJECT_FIELD] = await get_single_project( diff --git a/integrations/gcp/gcp_core/utils.py b/integrations/gcp/gcp_core/utils.py index c500d5183d..99db46b0d5 100644 --- a/integrations/gcp/gcp_core/utils.py +++ b/integrations/gcp/gcp_core/utils.py @@ -17,12 +17,14 @@ from gcp_core.helpers.ratelimiter.overrides import ( SearchAllResourcesQpmPerProject, PubSubAdministratorPerMinutePerProject, - ProjectsRequestsPerMinutePerProject, + ProjectGetRequestsPerMinutePerProject, + ProjectSearchRequestsPerMinutePerProject, ) search_all_resources_qpm_per_project = SearchAllResourcesQpmPerProject() pubsub_administrator_per_minute_per_project = PubSubAdministratorPerMinutePerProject() -project_get_requests_per_minute_per_project = ProjectsRequestsPerMinutePerProject() +project_get_requests_per_minute_per_project = ProjectGetRequestsPerMinutePerProject() +project_search_requests_per_minute_per_project = ProjectSearchRequestsPerMinutePerProject() EXTRA_PROJECT_FIELD = "__project" DEFAULT_CREDENTIALS_FILE_PATH = ( @@ -183,22 +185,36 @@ def get_service_account_project_id() -> str: async def get_quotas_for_project( - project_id: str, kind: str + project_id: str, kind: str, **kwargs: Any ) -> Tuple["AsyncLimiter", "BoundedSemaphore"]: try: match kind: case AssetTypesWithSpecialHandling.PROJECT: - project_rate_limiter = ( - await project_get_requests_per_minute_per_project.limiter( - project_id + method = kwargs.get("method") + if method == "search": + project_rate_limiter = ( + await project_search_requests_per_minute_per_project.limiter( + project_id + ) ) - ) - project_semaphore = ( - await project_get_requests_per_minute_per_project.semaphore( - project_id + project_semaphore = ( + await project_search_requests_per_minute_per_project.semaphore( + project_id + ) ) - ) - return project_rate_limiter, project_semaphore + return project_rate_limiter, project_semaphore + else: + project_rate_limiter = ( + await project_get_requests_per_minute_per_project.limiter( + project_id + ) + ) + project_semaphore = ( + await project_get_requests_per_minute_per_project.semaphore( + project_id + ) + ) + return project_rate_limiter, project_semaphore case ( AssetTypesWithSpecialHandling.TOPIC | AssetTypesWithSpecialHandling.SUBSCRIPTION @@ -236,7 +252,7 @@ async def get_quotas_for_project( async def resolve_request_controllers( - kind: str, + kind: str, **kwargs: Any ) -> Tuple["AsyncLimiter", "BoundedSemaphore"]: service_account_project_id = get_service_account_project_id() - return await get_quotas_for_project(service_account_project_id, kind) + return await get_quotas_for_project(service_account_project_id, kind, **kwargs) diff --git a/integrations/gcp/main.py b/integrations/gcp/main.py index cfc11c59e4..0c39c6b2da 100644 --- a/integrations/gcp/main.py +++ b/integrations/gcp/main.py @@ -62,7 +62,7 @@ async def _resolve_resync_method_for_resource( case AssetTypesWithSpecialHandling.ORGANIZATION: return search_all_organizations() case AssetTypesWithSpecialHandling.PROJECT: - project_rate_limiter, _ = await resolve_request_controllers(kind) + project_rate_limiter, _ = await resolve_request_controllers(kind, method="search") return search_all_projects(rate_limiter=project_rate_limiter) case _: asset_rate_limiter, asset_semaphore = await resolve_request_controllers( @@ -107,7 +107,7 @@ async def resync_organizations(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE: @ocean.on_resync(kind=AssetTypesWithSpecialHandling.PROJECT) async def resync_projects(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE: - resync_projects_rate_limiter, _ = await resolve_request_controllers(kind) + resync_projects_rate_limiter, _ = await resolve_request_controllers(kind, method="search") async for batch in search_all_projects(rate_limiter=resync_projects_rate_limiter): yield batch From cba6966f954c7c0db1bd52bace79992d7772ee12 Mon Sep 17 00:00:00 2001 From: Iyanuoluwa Adebayo Date: Thu, 9 Jan 2025 11:04:34 +0100 Subject: [PATCH 04/10] Fix: Lint issues --- integrations/gcp/gcp_core/search/resource_searches.py | 4 +++- integrations/gcp/gcp_core/utils.py | 4 +++- integrations/gcp/main.py | 8 ++++++-- 3 files changed, 12 insertions(+), 4 deletions(-) diff --git a/integrations/gcp/gcp_core/search/resource_searches.py b/integrations/gcp/gcp_core/search/resource_searches.py index 2a69754f21..55547de620 100644 --- a/integrations/gcp/gcp_core/search/resource_searches.py +++ b/integrations/gcp/gcp_core/search/resource_searches.py @@ -327,7 +327,9 @@ async def feed_event_to_resource( config: Optional[ProtoConfig] = None, ) -> RAW_ITEM: resource = None - live_event_projects_rate_limiter, _ = await resolve_request_controllers(AssetTypesWithSpecialHandling.PROJECT, method="get") + live_event_projects_rate_limiter, _ = await resolve_request_controllers( + AssetTypesWithSpecialHandling.PROJECT, method="get" + ) if asset_data.get("deleted") is True: resource = asset_data["priorAsset"]["resource"]["data"] resource[EXTRA_PROJECT_FIELD] = await get_single_project( diff --git a/integrations/gcp/gcp_core/utils.py b/integrations/gcp/gcp_core/utils.py index 99db46b0d5..3196d0fbf4 100644 --- a/integrations/gcp/gcp_core/utils.py +++ b/integrations/gcp/gcp_core/utils.py @@ -24,7 +24,9 @@ search_all_resources_qpm_per_project = SearchAllResourcesQpmPerProject() pubsub_administrator_per_minute_per_project = PubSubAdministratorPerMinutePerProject() project_get_requests_per_minute_per_project = ProjectGetRequestsPerMinutePerProject() -project_search_requests_per_minute_per_project = ProjectSearchRequestsPerMinutePerProject() +project_search_requests_per_minute_per_project = ( + ProjectSearchRequestsPerMinutePerProject() +) EXTRA_PROJECT_FIELD = "__project" DEFAULT_CREDENTIALS_FILE_PATH = ( diff --git a/integrations/gcp/main.py b/integrations/gcp/main.py index 0c39c6b2da..6577a36efc 100644 --- a/integrations/gcp/main.py +++ b/integrations/gcp/main.py @@ -62,7 +62,9 @@ async def _resolve_resync_method_for_resource( case AssetTypesWithSpecialHandling.ORGANIZATION: return search_all_organizations() case AssetTypesWithSpecialHandling.PROJECT: - project_rate_limiter, _ = await resolve_request_controllers(kind, method="search") + project_rate_limiter, _ = await resolve_request_controllers( + kind, method="search" + ) return search_all_projects(rate_limiter=project_rate_limiter) case _: asset_rate_limiter, asset_semaphore = await resolve_request_controllers( @@ -107,7 +109,9 @@ async def resync_organizations(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE: @ocean.on_resync(kind=AssetTypesWithSpecialHandling.PROJECT) async def resync_projects(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE: - resync_projects_rate_limiter, _ = await resolve_request_controllers(kind, method="search") + resync_projects_rate_limiter, _ = await resolve_request_controllers( + kind, method="search" + ) async for batch in search_all_projects(rate_limiter=resync_projects_rate_limiter): yield batch From f39507645ad8457137e97b7344f7ce4f9db5e29b Mon Sep 17 00:00:00 2001 From: Iyanuoluwa Adebayo Date: Thu, 9 Jan 2025 11:09:27 +0100 Subject: [PATCH 05/10] updated version --- integrations/gcp/CHANGELOG.md | 8 ++++++++ integrations/gcp/pyproject.toml | 2 +- 2 files changed, 9 insertions(+), 1 deletion(-) diff --git a/integrations/gcp/CHANGELOG.md b/integrations/gcp/CHANGELOG.md index 21ec0dfce3..1127d8b96d 100644 --- a/integrations/gcp/CHANGELOG.md +++ b/integrations/gcp/CHANGELOG.md @@ -7,6 +7,14 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 +## 0.1.90 (2025-01-09) + + +### Improvements + +- Added rate limiting support for `ProjectsV3GetRequestsPerMinutePerProject` and `ProjectV3SearchRequestsPerMinutePerProject` to handle GCP project quota limits during resyncs and real-time event processing + + ## 0.1.89 (2025-01-08) diff --git a/integrations/gcp/pyproject.toml b/integrations/gcp/pyproject.toml index 81520fc17a..7a7271317d 100644 --- a/integrations/gcp/pyproject.toml +++ b/integrations/gcp/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "gcp" -version = "0.1.89" +version = "0.1.90" description = "A GCP ocean integration" authors = ["Matan Geva "] From 07400e8dd997fe8fc86138dcb9cbfb86b7ec6141 Mon Sep 17 00:00:00 2001 From: Iyanuoluwa Adebayo Date: Thu, 9 Jan 2025 12:08:12 +0100 Subject: [PATCH 06/10] Updated test case --- .../gcp/tests/gcp_core/search/test_resource_searches.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/integrations/gcp/tests/gcp_core/search/test_resource_searches.py b/integrations/gcp/tests/gcp_core/search/test_resource_searches.py index a80c9ce70b..b459be7aaf 100644 --- a/integrations/gcp/tests/gcp_core/search/test_resource_searches.py +++ b/integrations/gcp/tests/gcp_core/search/test_resource_searches.py @@ -9,6 +9,7 @@ from google.cloud.resourcemanager_v3.types import Project + async def mock_subscription_pages( *args: Any, **kwargs: Any ) -> ASYNC_GENERATOR_RESYNC_TYPE: @@ -104,9 +105,10 @@ async def test_get_single_subscription( @pytest.mark.asyncio +@patch("gcp_core.utils.resolve_request_controllers") @patch("gcp_core.utils.get_current_resource_config") async def test_feed_to_resource( - get_current_resource_config_mock: MagicMock, monkeypatch: Any + get_current_resource_config_mock: MagicMock, resolve_request_controllers_mock: AsyncMock, monkeypatch: Any, ) -> None: # Arrange projects_async_client_mock = AsyncMock @@ -136,6 +138,10 @@ async def test_feed_to_resource( from gcp_core.search.resource_searches import feed_event_to_resource + # Mock resolve_request_controllers + mock_rate_limiter = AsyncMock() + resolve_request_controllers_mock.return_value = (mock_rate_limiter, None) + mock_asset_name = "projects/project_name/topics/topic_name" mock_asset_type = "pubsub.googleapis.com/Topic" mock_asset_project_name = "project_name" From 0fa873073a9622b956a8e1a1220169e63423899f Mon Sep 17 00:00:00 2001 From: Iyanuoluwa Adebayo Date: Thu, 9 Jan 2025 12:14:14 +0100 Subject: [PATCH 07/10] fix: lint --- .../gcp/tests/gcp_core/search/test_resource_searches.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/integrations/gcp/tests/gcp_core/search/test_resource_searches.py b/integrations/gcp/tests/gcp_core/search/test_resource_searches.py index b459be7aaf..4136d96ed2 100644 --- a/integrations/gcp/tests/gcp_core/search/test_resource_searches.py +++ b/integrations/gcp/tests/gcp_core/search/test_resource_searches.py @@ -9,7 +9,6 @@ from google.cloud.resourcemanager_v3.types import Project - async def mock_subscription_pages( *args: Any, **kwargs: Any ) -> ASYNC_GENERATOR_RESYNC_TYPE: @@ -108,7 +107,9 @@ async def test_get_single_subscription( @patch("gcp_core.utils.resolve_request_controllers") @patch("gcp_core.utils.get_current_resource_config") async def test_feed_to_resource( - get_current_resource_config_mock: MagicMock, resolve_request_controllers_mock: AsyncMock, monkeypatch: Any, + get_current_resource_config_mock: MagicMock, + resolve_request_controllers_mock: AsyncMock, + monkeypatch: Any, ) -> None: # Arrange projects_async_client_mock = AsyncMock From 3bd0f797d574d7080036f7bd5dc44c26b11f73a8 Mon Sep 17 00:00:00 2001 From: Iyanuoluwa Adebayo Date: Tue, 14 Jan 2025 04:03:30 +0100 Subject: [PATCH 08/10] Attended to reviewer comments --- integrations/gcp/gcp_core/search/resource_searches.py | 3 ++- integrations/gcp/gcp_core/utils.py | 9 +++++---- integrations/gcp/main.py | 4 ++-- integrations/gcp/pyproject.toml | 2 +- 4 files changed, 10 insertions(+), 8 deletions(-) diff --git a/integrations/gcp/gcp_core/search/resource_searches.py b/integrations/gcp/gcp_core/search/resource_searches.py index 55547de620..d7b3e5c0cd 100644 --- a/integrations/gcp/gcp_core/search/resource_searches.py +++ b/integrations/gcp/gcp_core/search/resource_searches.py @@ -328,7 +328,8 @@ async def feed_event_to_resource( ) -> RAW_ITEM: resource = None live_event_projects_rate_limiter, _ = await resolve_request_controllers( - AssetTypesWithSpecialHandling.PROJECT, method="get" + AssetTypesWithSpecialHandling.PROJECT, + quota_id="ProjectV3GetRequestsPerMinutePerProject", ) if asset_data.get("deleted") is True: resource = asset_data["priorAsset"]["resource"]["data"] diff --git a/integrations/gcp/gcp_core/utils.py b/integrations/gcp/gcp_core/utils.py index 3196d0fbf4..7f8fec675d 100644 --- a/integrations/gcp/gcp_core/utils.py +++ b/integrations/gcp/gcp_core/utils.py @@ -187,13 +187,12 @@ def get_service_account_project_id() -> str: async def get_quotas_for_project( - project_id: str, kind: str, **kwargs: Any + project_id: str, kind: str, quota_id: Optional[str] = None ) -> Tuple["AsyncLimiter", "BoundedSemaphore"]: try: match kind: case AssetTypesWithSpecialHandling.PROJECT: - method = kwargs.get("method") - if method == "search": + if quota_id == "ProjectV3SearchRequestsPerMinutePerProject": project_rate_limiter = ( await project_search_requests_per_minute_per_project.limiter( project_id @@ -257,4 +256,6 @@ async def resolve_request_controllers( kind: str, **kwargs: Any ) -> Tuple["AsyncLimiter", "BoundedSemaphore"]: service_account_project_id = get_service_account_project_id() - return await get_quotas_for_project(service_account_project_id, kind, **kwargs) + return await get_quotas_for_project( + service_account_project_id, kind, quota_id=kwargs.get("quota_id") + ) diff --git a/integrations/gcp/main.py b/integrations/gcp/main.py index 6577a36efc..14f56dc1aa 100644 --- a/integrations/gcp/main.py +++ b/integrations/gcp/main.py @@ -63,7 +63,7 @@ async def _resolve_resync_method_for_resource( return search_all_organizations() case AssetTypesWithSpecialHandling.PROJECT: project_rate_limiter, _ = await resolve_request_controllers( - kind, method="search" + kind, quota_id="ProjectV3SearchRequestsPerMinutePerProject" ) return search_all_projects(rate_limiter=project_rate_limiter) case _: @@ -110,7 +110,7 @@ async def resync_organizations(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE: @ocean.on_resync(kind=AssetTypesWithSpecialHandling.PROJECT) async def resync_projects(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE: resync_projects_rate_limiter, _ = await resolve_request_controllers( - kind, method="search" + kind, quota_id="ProjectV3SearchRequestsPerMinutePerProject" ) async for batch in search_all_projects(rate_limiter=resync_projects_rate_limiter): yield batch diff --git a/integrations/gcp/pyproject.toml b/integrations/gcp/pyproject.toml index 1fa4395e8f..376264d4ec 100644 --- a/integrations/gcp/pyproject.toml +++ b/integrations/gcp/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "gcp" -version = "0.1.90" +version = "0.1.91" description = "A GCP ocean integration" authors = ["Matan Geva "] From 76203a6df82f3c21246c1cfd95ca4b1e642d3819 Mon Sep 17 00:00:00 2001 From: Adebayo Oluwadunsin Iyanuoluwa <88881603+oiadebayo@users.noreply.github.com> Date: Fri, 17 Jan 2025 09:31:50 -0800 Subject: [PATCH 09/10] Update integrations/gcp/gcp_core/utils.py Co-authored-by: Michael Kofi Armah --- integrations/gcp/gcp_core/utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/integrations/gcp/gcp_core/utils.py b/integrations/gcp/gcp_core/utils.py index 7f8fec675d..da2b7aa503 100644 --- a/integrations/gcp/gcp_core/utils.py +++ b/integrations/gcp/gcp_core/utils.py @@ -253,7 +253,7 @@ async def get_quotas_for_project( async def resolve_request_controllers( - kind: str, **kwargs: Any + kind: str, **kwargs: Dict[str, Any] ) -> Tuple["AsyncLimiter", "BoundedSemaphore"]: service_account_project_id = get_service_account_project_id() return await get_quotas_for_project( From 3b2fd4ef81755bd8591634c77e67fab76bfb6f8a Mon Sep 17 00:00:00 2001 From: Adebayo Oluwadunsin Iyanuoluwa <88881603+oiadebayo@users.noreply.github.com> Date: Fri, 17 Jan 2025 09:32:04 -0800 Subject: [PATCH 10/10] Update integrations/gcp/gcp_core/search/resource_searches.py Co-authored-by: Michael Kofi Armah --- integrations/gcp/gcp_core/search/resource_searches.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/integrations/gcp/gcp_core/search/resource_searches.py b/integrations/gcp/gcp_core/search/resource_searches.py index d7b3e5c0cd..bba7310436 100644 --- a/integrations/gcp/gcp_core/search/resource_searches.py +++ b/integrations/gcp/gcp_core/search/resource_searches.py @@ -179,7 +179,7 @@ async def list_all_subscriptions_per_project( @cache_iterator_result() -async def search_all_projects(**kwargs: Any) -> ASYNC_GENERATOR_RESYNC_TYPE: +async def search_all_projects(**kwargs: Dict[str, Any]) -> ASYNC_GENERATOR_RESYNC_TYPE: logger.info("Searching projects") async with ProjectsAsyncClient() as projects_client: async for projects in paginated_query(