diff --git a/integrations/gcp/CHANGELOG.md b/integrations/gcp/CHANGELOG.md index dfbd7abed7..3f985d74d5 100644 --- a/integrations/gcp/CHANGELOG.md +++ b/integrations/gcp/CHANGELOG.md @@ -7,6 +7,14 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 +## 0.1.93 (2025-01-16) + + +### Improvements + +- Added rate limiting support for `ProjectsV3GetRequestsPerMinutePerProject` and `ProjectV3SearchRequestsPerMinutePerProject` to handle GCP project quota limits during resyncs and real-time event processing + + ## 0.1.92 (2025-01-16) diff --git a/integrations/gcp/gcp_core/helpers/ratelimiter/overrides.py b/integrations/gcp/gcp_core/helpers/ratelimiter/overrides.py index 4f3d59442a..b9c1ec8ecf 100644 --- a/integrations/gcp/gcp_core/helpers/ratelimiter/overrides.py +++ b/integrations/gcp/gcp_core/helpers/ratelimiter/overrides.py @@ -12,6 +12,10 @@ class PubSubAPI(ResourceBoundedSemaphore): service = "pubsub.googleapis.com" +class CloudResourceManagerAPI(ResourceBoundedSemaphore): + service = "cloudresourcemanager.googleapis.com" + + class SearchAllResourcesQpmPerProject(CloudAssetAPI): quota_id = "apiSearchAllResourcesQpmPerProject" container_type = ContainerType.PROJECT @@ -20,3 +24,13 @@ class SearchAllResourcesQpmPerProject(CloudAssetAPI): class PubSubAdministratorPerMinutePerProject(PubSubAPI): quota_id = "administratorPerMinutePerProject" container_type = ContainerType.PROJECT + + +class ProjectGetRequestsPerMinutePerProject(CloudResourceManagerAPI): + quota_id = "ProjectV3GetRequestsPerMinutePerProject" + container_type = ContainerType.PROJECT + + +class ProjectSearchRequestsPerMinutePerProject(CloudResourceManagerAPI): + quota_id = "ProjectV3SearchRequestsPerMinutePerProject" + container_type = ContainerType.PROJECT diff --git a/integrations/gcp/gcp_core/search/resource_searches.py b/integrations/gcp/gcp_core/search/resource_searches.py index 6fab7b5360..bba7310436 100644 --- a/integrations/gcp/gcp_core/search/resource_searches.py +++ b/integrations/gcp/gcp_core/search/resource_searches.py @@ -13,6 +13,8 @@ from google.pubsub_v1.services.publisher import PublisherAsyncClient from google.pubsub_v1.services.subscriber import SubscriberAsyncClient from loguru import logger + +from gcp_core.utils import resolve_request_controllers from port_ocean.core.ocean_types import ASYNC_GENERATOR_RESYNC_TYPE, RAW_ITEM from port_ocean.utils.cache import cache_iterator_result from gcp_core.errors import ResourceNotFoundError @@ -177,7 +179,7 @@ async def list_all_subscriptions_per_project( @cache_iterator_result() -async def search_all_projects() -> ASYNC_GENERATOR_RESYNC_TYPE: +async def search_all_projects(**kwargs: Dict[str, Any]) -> ASYNC_GENERATOR_RESYNC_TYPE: logger.info("Searching projects") async with ProjectsAsyncClient() as projects_client: async for projects in paginated_query( @@ -185,6 +187,7 @@ async def search_all_projects() -> ASYNC_GENERATOR_RESYNC_TYPE: "search_projects", {}, lambda response: parse_protobuf_messages(response.projects), + kwargs.get("rate_limiter"), ): yield projects @@ -214,15 +217,28 @@ async def search_all_organizations() -> ASYNC_GENERATOR_RESYNC_TYPE: async def get_single_project( - project_name: str, config: Optional[ProtoConfig] = None + project_name: str, config: Optional[ProtoConfig] = None, **kwargs: Any ) -> RAW_ITEM: async with ProjectsAsyncClient() as projects_client: - return parse_protobuf_message( - await projects_client.get_project( - name=project_name, timeout=DEFAULT_REQUEST_TIMEOUT - ), - config, - ) + rate_limiter = kwargs.get("rate_limiter") + if rate_limiter: + async with rate_limiter: + logger.info( + f"Executing get_single_project. Current rate limit: {rate_limiter.max_rate} requests per {rate_limiter.time_period} seconds." + ) + return parse_protobuf_message( + await projects_client.get_project( + name=project_name, timeout=DEFAULT_REQUEST_TIMEOUT + ), + config, + ) + else: + return parse_protobuf_message( + await projects_client.get_project( + name=project_name, timeout=DEFAULT_REQUEST_TIMEOUT + ), + config, + ) async def get_single_folder( @@ -311,22 +327,28 @@ async def feed_event_to_resource( config: Optional[ProtoConfig] = None, ) -> RAW_ITEM: resource = None + live_event_projects_rate_limiter, _ = await resolve_request_controllers( + AssetTypesWithSpecialHandling.PROJECT, + quota_id="ProjectV3GetRequestsPerMinutePerProject", + ) if asset_data.get("deleted") is True: resource = asset_data["priorAsset"]["resource"]["data"] - resource[EXTRA_PROJECT_FIELD] = await get_single_project(project_id, config) + resource[EXTRA_PROJECT_FIELD] = await get_single_project( + project_id, config, rate_limiter=live_event_projects_rate_limiter + ) else: match asset_type: case AssetTypesWithSpecialHandling.TOPIC: topic_name = asset_name.replace("//pubsub.googleapis.com/", "") resource = await get_single_topic(topic_name, config) resource[EXTRA_PROJECT_FIELD] = await get_single_project( - project_id, config + project_id, config, rate_limiter=live_event_projects_rate_limiter ) case AssetTypesWithSpecialHandling.SUBSCRIPTION: topic_name = asset_name.replace("//pubsub.googleapis.com/", "") resource = await get_single_subscription(topic_name, config) resource[EXTRA_PROJECT_FIELD] = await get_single_project( - project_id, config + project_id, config, rate_limiter=live_event_projects_rate_limiter ) case AssetTypesWithSpecialHandling.FOLDER: folder_id = asset_name.replace( @@ -339,10 +361,12 @@ async def feed_event_to_resource( ) resource = await get_single_organization(organization_id, config) case AssetTypesWithSpecialHandling.PROJECT: - resource = await get_single_project(project_id, config) + resource = await get_single_project( + project_id, config, rate_limiter=live_event_projects_rate_limiter + ) case _: resource = asset_data["asset"]["resource"]["data"] resource[EXTRA_PROJECT_FIELD] = await get_single_project( - project_id, config + project_id, config, rate_limiter=live_event_projects_rate_limiter ) return resource diff --git a/integrations/gcp/gcp_core/utils.py b/integrations/gcp/gcp_core/utils.py index 422699c6fb..da2b7aa503 100644 --- a/integrations/gcp/gcp_core/utils.py +++ b/integrations/gcp/gcp_core/utils.py @@ -17,10 +17,16 @@ from gcp_core.helpers.ratelimiter.overrides import ( SearchAllResourcesQpmPerProject, PubSubAdministratorPerMinutePerProject, + ProjectGetRequestsPerMinutePerProject, + ProjectSearchRequestsPerMinutePerProject, ) search_all_resources_qpm_per_project = SearchAllResourcesQpmPerProject() pubsub_administrator_per_minute_per_project = PubSubAdministratorPerMinutePerProject() +project_get_requests_per_minute_per_project = ProjectGetRequestsPerMinutePerProject() +project_search_requests_per_minute_per_project = ( + ProjectSearchRequestsPerMinutePerProject() +) EXTRA_PROJECT_FIELD = "__project" DEFAULT_CREDENTIALS_FILE_PATH = ( @@ -181,10 +187,35 @@ def get_service_account_project_id() -> str: async def get_quotas_for_project( - project_id: str, kind: str + project_id: str, kind: str, quota_id: Optional[str] = None ) -> Tuple["AsyncLimiter", "BoundedSemaphore"]: try: match kind: + case AssetTypesWithSpecialHandling.PROJECT: + if quota_id == "ProjectV3SearchRequestsPerMinutePerProject": + project_rate_limiter = ( + await project_search_requests_per_minute_per_project.limiter( + project_id + ) + ) + project_semaphore = ( + await project_search_requests_per_minute_per_project.semaphore( + project_id + ) + ) + return project_rate_limiter, project_semaphore + else: + project_rate_limiter = ( + await project_get_requests_per_minute_per_project.limiter( + project_id + ) + ) + project_semaphore = ( + await project_get_requests_per_minute_per_project.semaphore( + project_id + ) + ) + return project_rate_limiter, project_semaphore case ( AssetTypesWithSpecialHandling.TOPIC | AssetTypesWithSpecialHandling.SUBSCRIPTION @@ -222,7 +253,9 @@ async def get_quotas_for_project( async def resolve_request_controllers( - kind: str, + kind: str, **kwargs: Dict[str, Any] ) -> Tuple["AsyncLimiter", "BoundedSemaphore"]: service_account_project_id = get_service_account_project_id() - return await get_quotas_for_project(service_account_project_id, kind) + return await get_quotas_for_project( + service_account_project_id, kind, quota_id=kwargs.get("quota_id") + ) diff --git a/integrations/gcp/main.py b/integrations/gcp/main.py index 482c6bf928..14f56dc1aa 100644 --- a/integrations/gcp/main.py +++ b/integrations/gcp/main.py @@ -62,7 +62,10 @@ async def _resolve_resync_method_for_resource( case AssetTypesWithSpecialHandling.ORGANIZATION: return search_all_organizations() case AssetTypesWithSpecialHandling.PROJECT: - return search_all_projects() + project_rate_limiter, _ = await resolve_request_controllers( + kind, quota_id="ProjectV3SearchRequestsPerMinutePerProject" + ) + return search_all_projects(rate_limiter=project_rate_limiter) case _: asset_rate_limiter, asset_semaphore = await resolve_request_controllers( kind @@ -106,7 +109,10 @@ async def resync_organizations(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE: @ocean.on_resync(kind=AssetTypesWithSpecialHandling.PROJECT) async def resync_projects(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE: - async for batch in search_all_projects(): + resync_projects_rate_limiter, _ = await resolve_request_controllers( + kind, quota_id="ProjectV3SearchRequestsPerMinutePerProject" + ) + async for batch in search_all_projects(rate_limiter=resync_projects_rate_limiter): yield batch diff --git a/integrations/gcp/pyproject.toml b/integrations/gcp/pyproject.toml index c7687e46d8..ef2a6ec5d2 100644 --- a/integrations/gcp/pyproject.toml +++ b/integrations/gcp/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "gcp" -version = "0.1.92" +version = "0.1.93" description = "A GCP ocean integration" authors = ["Matan Geva "] diff --git a/integrations/gcp/tests/gcp_core/search/test_resource_searches.py b/integrations/gcp/tests/gcp_core/search/test_resource_searches.py index a80c9ce70b..4136d96ed2 100644 --- a/integrations/gcp/tests/gcp_core/search/test_resource_searches.py +++ b/integrations/gcp/tests/gcp_core/search/test_resource_searches.py @@ -104,9 +104,12 @@ async def test_get_single_subscription( @pytest.mark.asyncio +@patch("gcp_core.utils.resolve_request_controllers") @patch("gcp_core.utils.get_current_resource_config") async def test_feed_to_resource( - get_current_resource_config_mock: MagicMock, monkeypatch: Any + get_current_resource_config_mock: MagicMock, + resolve_request_controllers_mock: AsyncMock, + monkeypatch: Any, ) -> None: # Arrange projects_async_client_mock = AsyncMock @@ -136,6 +139,10 @@ async def test_feed_to_resource( from gcp_core.search.resource_searches import feed_event_to_resource + # Mock resolve_request_controllers + mock_rate_limiter = AsyncMock() + resolve_request_controllers_mock.return_value = (mock_rate_limiter, None) + mock_asset_name = "projects/project_name/topics/topic_name" mock_asset_type = "pubsub.googleapis.com/Topic" mock_asset_project_name = "project_name"