From d9c5a70fb698ce77c9526da83f72868d5d1dae54 Mon Sep 17 00:00:00 2001
From: Adebayo Oluwadunsin Iyanuoluwa
<88881603+oiadebayo@users.noreply.github.com>
Date: Mon, 27 Jan 2025 22:06:20 +0100
Subject: [PATCH] [Integration][GCP] Added Rate Limiting for
ProjectsV3GetRequestsPerMinutePerProject Quota (#1304)
# Description
**What**
- Implemented rate limiting for the
`ProjectsV3GetRequestsPerMinutePerProject` quota.
- Adjusted real-time event handling to respect the newly implemented
rate limits.
**Why**
- To address issues reported where real-time event handling exceeded GCP
quota limits, resulting in `429 Quota Exceeded` errors.
- Improve system reliability and data consistency by adhering to quota
limits.
**How**
- Introduced a rate limiting mechanism for the
`ProjectsV3GetRequestsPerMinutePerProject` quota using the existing rate
limiter
- Added support in the `resolve_request_controllers` method to fetch and
apply this specific quota.
- Integrated the rate limiter into real-time event processing methods,
particularly for project-related operations.
## Type of change
Please leave one option from the following and delete the rest:
- [x] Bug fix (non-breaking change which fixes an issue)
- [ ] New feature (non-breaking change which adds functionality)
- [ ] New Integration (non-breaking change which adds a new integration)
- [ ] Breaking change (fix or feature that would cause existing
functionality to not work as expected)
- [ ] Non-breaking change (fix of existing functionality that will not
change current behavior)
- [ ] Documentation (added/updated documentation)
All tests should be run against the port production
environment(using a testing org).
### Core testing checklist
- [ ] Integration able to create all default resources from scratch
- [ ] Resync finishes successfully
- [ ] Resync able to create entities
- [ ] Resync able to update entities
- [ ] Resync able to detect and delete entities
- [ ] Scheduled resync able to abort existing resync and start a new one
- [ ] Tested with at least 2 integrations from scratch
- [ ] Tested with Kafka and Polling event listeners
- [ ] Tested deletion of entities that don't pass the selector
### Integration testing checklist
- [ ] Integration able to create all default resources from scratch
- [ ] Resync able to create entities
- [ ] Resync able to update entities
- [ ] Resync able to detect and delete entities
- [ ] Resync finishes successfully
- [ ] If new resource kind is added or updated in the integration, add
example raw data, mapping and expected result to the `examples` folder
in the integration directory.
- [ ] If resource kind is updated, run the integration with the example
data and check if the expected result is achieved
- [ ] If new resource kind is added or updated, validate that
live-events for that resource are working as expected
- [ ] Docs PR link [here](#)
### Preflight checklist
- [ ] Handled rate limiting
- [ ] Handled pagination
- [ ] Implemented the code in async
- [ ] Support Multi account
## Screenshots
Include screenshots from your environment showing how the resources of
the integration will look.
## API Documentation
Provide links to the API documentation used for this integration.
---------
Co-authored-by: Michael Kofi Armah
Co-authored-by: Matan <51418643+matan84@users.noreply.github.com>
---
integrations/gcp/CHANGELOG.md | 10 +++
.../gcp_core/helpers/ratelimiter/overrides.py | 9 ++
.../gcp/gcp_core/search/resource_searches.py | 50 ++++++-----
integrations/gcp/gcp_core/utils.py | 17 +++-
integrations/gcp/main.py | 90 ++++++++++++++-----
integrations/gcp/pyproject.toml | 2 +-
.../gcp_core/search/test_resource_searches.py | 33 ++++++-
7 files changed, 165 insertions(+), 46 deletions(-)
diff --git a/integrations/gcp/CHANGELOG.md b/integrations/gcp/CHANGELOG.md
index 47fccd19d5..d4b844b0fd 100644
--- a/integrations/gcp/CHANGELOG.md
+++ b/integrations/gcp/CHANGELOG.md
@@ -7,6 +7,16 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
+## 0.1.98 (2025-01-24)
+
+
+### Improvements
+
+- Added rate limiting support for `ProjectsV3GetRequestsPerMinutePerProject` to handle GCP project quota limits during real-time event processing.
+- Implemented a shared `AsyncLimiter` instance to ensure consistent rate limiting across real-time events
+- Moved real-time event processing to run in the background, preventing timeouts and ensuring smoother handling of rate-limited operations.
+
+
## 0.1.97 (2025-01-23)
diff --git a/integrations/gcp/gcp_core/helpers/ratelimiter/overrides.py b/integrations/gcp/gcp_core/helpers/ratelimiter/overrides.py
index 4f3d59442a..9976017c8e 100644
--- a/integrations/gcp/gcp_core/helpers/ratelimiter/overrides.py
+++ b/integrations/gcp/gcp_core/helpers/ratelimiter/overrides.py
@@ -12,6 +12,10 @@ class PubSubAPI(ResourceBoundedSemaphore):
service = "pubsub.googleapis.com"
+class CloudResourceManagerAPI(ResourceBoundedSemaphore):
+ service = "cloudresourcemanager.googleapis.com"
+
+
class SearchAllResourcesQpmPerProject(CloudAssetAPI):
quota_id = "apiSearchAllResourcesQpmPerProject"
container_type = ContainerType.PROJECT
@@ -20,3 +24,8 @@ class SearchAllResourcesQpmPerProject(CloudAssetAPI):
class PubSubAdministratorPerMinutePerProject(PubSubAPI):
quota_id = "administratorPerMinutePerProject"
container_type = ContainerType.PROJECT
+
+
+class ProjectGetRequestsPerMinutePerProject(CloudResourceManagerAPI):
+ quota_id = "ProjectV3GetRequestsPerMinutePerProject"
+ container_type = ContainerType.PROJECT
diff --git a/integrations/gcp/gcp_core/search/resource_searches.py b/integrations/gcp/gcp_core/search/resource_searches.py
index 6fab7b5360..2eabb60549 100644
--- a/integrations/gcp/gcp_core/search/resource_searches.py
+++ b/integrations/gcp/gcp_core/search/resource_searches.py
@@ -1,4 +1,4 @@
-from typing import Any, Optional
+from typing import Any
import typing
from google.api_core.exceptions import NotFound, PermissionDenied
@@ -24,6 +24,7 @@
parse_protobuf_messages,
parse_latest_resource_from_asset,
)
+from aiolimiter import AsyncLimiter
from gcp_core.search.paginated_query import paginated_query, DEFAULT_REQUEST_TIMEOUT
from gcp_core.helpers.ratelimiter.base import MAXIMUM_CONCURRENT_REQUESTS
from asyncio import BoundedSemaphore
@@ -214,20 +215,24 @@ async def search_all_organizations() -> ASYNC_GENERATOR_RESYNC_TYPE:
async def get_single_project(
- project_name: str, config: Optional[ProtoConfig] = None
+ project_name: str,
+ rate_limiter: AsyncLimiter,
+ config: ProtoConfig,
) -> RAW_ITEM:
async with ProjectsAsyncClient() as projects_client:
- return parse_protobuf_message(
- await projects_client.get_project(
- name=project_name, timeout=DEFAULT_REQUEST_TIMEOUT
- ),
- config,
- )
+ async with rate_limiter:
+ logger.debug(
+ f"Executing get_single_project. Current rate limit: {rate_limiter.max_rate} requests per {rate_limiter.time_period} seconds."
+ )
+ return parse_protobuf_message(
+ await projects_client.get_project(
+ name=project_name, timeout=DEFAULT_REQUEST_TIMEOUT
+ ),
+ config,
+ )
-async def get_single_folder(
- folder_name: str, config: Optional[ProtoConfig] = None
-) -> RAW_ITEM:
+async def get_single_folder(folder_name: str, config: ProtoConfig) -> RAW_ITEM:
async with FoldersAsyncClient() as folders_client:
return parse_protobuf_message(
await folders_client.get_folder(
@@ -238,7 +243,7 @@ async def get_single_folder(
async def get_single_organization(
- organization_name: str, config: Optional[ProtoConfig] = None
+ organization_name: str, config: ProtoConfig
) -> RAW_ITEM:
async with OrganizationsAsyncClient() as organizations_client:
return parse_protobuf_message(
@@ -251,7 +256,7 @@ async def get_single_organization(
async def get_single_topic(
topic_id: str,
- config: Optional[ProtoConfig] = None,
+ config: ProtoConfig,
) -> RAW_ITEM:
"""
The Topics are handled specifically due to lacks of data in the asset itself within the asset inventory- e.g. some properties missing.
@@ -268,7 +273,7 @@ async def get_single_topic(
async def get_single_subscription(
subscription_id: str,
- config: Optional[ProtoConfig] = None,
+ config: ProtoConfig,
) -> RAW_ITEM:
"""
Subscriptions are handled specifically due to lacks of data in the asset itself within the asset inventory- e.g. some properties missing.
@@ -308,25 +313,28 @@ async def feed_event_to_resource(
asset_name: str,
project_id: str,
asset_data: dict[str, Any],
- config: Optional[ProtoConfig] = None,
+ project_rate_limiter: AsyncLimiter,
+ config: ProtoConfig,
) -> RAW_ITEM:
resource = None
if asset_data.get("deleted") is True:
resource = asset_data["priorAsset"]["resource"]["data"]
- resource[EXTRA_PROJECT_FIELD] = await get_single_project(project_id, config)
+ resource[EXTRA_PROJECT_FIELD] = await get_single_project(
+ project_id, project_rate_limiter, config
+ )
else:
match asset_type:
case AssetTypesWithSpecialHandling.TOPIC:
topic_name = asset_name.replace("//pubsub.googleapis.com/", "")
resource = await get_single_topic(topic_name, config)
resource[EXTRA_PROJECT_FIELD] = await get_single_project(
- project_id, config
+ project_id, project_rate_limiter, config
)
case AssetTypesWithSpecialHandling.SUBSCRIPTION:
topic_name = asset_name.replace("//pubsub.googleapis.com/", "")
resource = await get_single_subscription(topic_name, config)
resource[EXTRA_PROJECT_FIELD] = await get_single_project(
- project_id, config
+ project_id, project_rate_limiter, config
)
case AssetTypesWithSpecialHandling.FOLDER:
folder_id = asset_name.replace(
@@ -339,10 +347,12 @@ async def feed_event_to_resource(
)
resource = await get_single_organization(organization_id, config)
case AssetTypesWithSpecialHandling.PROJECT:
- resource = await get_single_project(project_id, config)
+ resource = await get_single_project(
+ project_id, project_rate_limiter, config
+ )
case _:
resource = asset_data["asset"]["resource"]["data"]
resource[EXTRA_PROJECT_FIELD] = await get_single_project(
- project_id, config
+ project_id, project_rate_limiter, config
)
return resource
diff --git a/integrations/gcp/gcp_core/utils.py b/integrations/gcp/gcp_core/utils.py
index 422699c6fb..f42fd9b17d 100644
--- a/integrations/gcp/gcp_core/utils.py
+++ b/integrations/gcp/gcp_core/utils.py
@@ -17,10 +17,12 @@
from gcp_core.helpers.ratelimiter.overrides import (
SearchAllResourcesQpmPerProject,
PubSubAdministratorPerMinutePerProject,
+ ProjectGetRequestsPerMinutePerProject,
)
search_all_resources_qpm_per_project = SearchAllResourcesQpmPerProject()
pubsub_administrator_per_minute_per_project = PubSubAdministratorPerMinutePerProject()
+project_get_requests_per_minute_per_project = ProjectGetRequestsPerMinutePerProject()
EXTRA_PROJECT_FIELD = "__project"
DEFAULT_CREDENTIALS_FILE_PATH = (
@@ -181,10 +183,23 @@ def get_service_account_project_id() -> str:
async def get_quotas_for_project(
- project_id: str, kind: str
+ project_id: str,
+ kind: str,
) -> Tuple["AsyncLimiter", "BoundedSemaphore"]:
try:
match kind:
+ case AssetTypesWithSpecialHandling.PROJECT:
+ project_rate_limiter = (
+ await project_get_requests_per_minute_per_project.limiter(
+ project_id
+ )
+ )
+ project_semaphore = (
+ await project_get_requests_per_minute_per_project.semaphore(
+ project_id
+ )
+ )
+ return project_rate_limiter, project_semaphore
case (
AssetTypesWithSpecialHandling.TOPIC
| AssetTypesWithSpecialHandling.SUBSCRIPTION
diff --git a/integrations/gcp/main.py b/integrations/gcp/main.py
index 482c6bf928..0eeb8ed849 100644
--- a/integrations/gcp/main.py
+++ b/integrations/gcp/main.py
@@ -3,7 +3,8 @@
import tempfile
import typing
-from fastapi import Request, Response
+from aiolimiter import AsyncLimiter
+from fastapi import Request, Response, BackgroundTasks
from loguru import logger
from port_ocean.context.ocean import ocean
@@ -17,7 +18,6 @@
from gcp_core.overrides import (
GCPCloudResourceSelector,
GCPPortAppConfig,
- GCPResourceSelector,
ProtoConfig,
)
from port_ocean.context.event import event
@@ -38,6 +38,8 @@
resolve_request_controllers,
)
+PROJECT_V3_GET_REQUESTS_RATE_LIMITER: AsyncLimiter
+
async def _resolve_resync_method_for_resource(
kind: str,
@@ -75,6 +77,15 @@ async def _resolve_resync_method_for_resource(
)
+@ocean.on_start()
+async def setup_real_time_request_controllers() -> None:
+ global PROJECT_V3_GET_REQUESTS_RATE_LIMITER
+ if not ocean.event_listener_type == "ONCE":
+ PROJECT_V3_GET_REQUESTS_RATE_LIMITER, _ = await resolve_request_controllers(
+ AssetTypesWithSpecialHandling.PROJECT
+ )
+
+
@ocean.on_start()
async def setup_application_default_credentials() -> None:
if not ocean.integration_config["encoded_adc_configuration"]:
@@ -163,8 +174,56 @@ async def resync_cloud_resources(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE:
yield resources_batch
+async def process_realtime_event(
+ asset_type: str,
+ asset_name: str,
+ asset_project: str,
+ asset_data: dict[str, typing.Any],
+ config: ProtoConfig,
+) -> None:
+ """
+ This function runs in the background to ensure the real-time event endpoints
+ do not time out while waiting for rate-limited operations to complete. It is triggered
+ by the real-time events handler when a new event is received.
+
+ ROJECT_V3_GET_REQUESTS_RATE_LIMITER is provided as a static value instead of being dynamic because all real-time events
+ needs to share the same instance of the limiter and it had to be instantiated on start for this to be possible.
+ The dynamic initialization of the limiter will make it impossible to share the same instance across all event context.
+ """
+ try:
+ logger.debug(
+ f"Processing real-time event for {asset_type} : {asset_name} in the background"
+ )
+ asset_resource_data = await feed_event_to_resource(
+ asset_type,
+ asset_name,
+ asset_project,
+ asset_data,
+ PROJECT_V3_GET_REQUESTS_RATE_LIMITER,
+ config,
+ )
+ if asset_data.get("deleted") is True:
+ logger.info(
+ f"Resource {asset_type} : {asset_name} has been deleted in GCP, unregistering from port"
+ )
+ await ocean.unregister_raw(asset_type, [asset_resource_data])
+ else:
+ logger.info(
+ f"Registering creation/update of resource {asset_type} : {asset_name} in project {asset_project} in Port"
+ )
+ await ocean.register_raw(asset_type, [asset_resource_data])
+ except AssetHasNoProjectAncestorError:
+ logger.exception(
+ f"Couldn't find project ancestor to asset {asset_name}. Other types of ancestors and not supported yet."
+ )
+ except Exception as e:
+ logger.exception(f"Got error {e} while processing a real time event")
+
+
@ocean.router.post("/events")
-async def feed_events_callback(request: Request) -> Response:
+async def feed_events_callback(
+ request: Request, background_tasks: BackgroundTasks
+) -> Response:
"""
This is the real-time events handler. The subscription which is connected to the Feeds Topic will send events here once
the events are inserted into the Assets Inventory.
@@ -193,10 +252,7 @@ async def feed_events_callback(request: Request) -> Response:
matching_resource_configs = [
resource_config
for resource_config in resource_configs
- if (
- resource_config.kind == asset_type
- and isinstance(resource_config.selector, GCPResourceSelector)
- )
+ if (resource_config.kind == asset_type)
]
for matching_resource_config in matching_resource_configs:
selector = matching_resource_config.selector
@@ -205,30 +261,24 @@ async def feed_events_callback(request: Request) -> Response:
getattr(selector, "preserve_api_response_case_style", False)
)
)
- asset_resource_data = await feed_event_to_resource(
+ background_tasks.add_task(
+ process_realtime_event,
asset_type,
asset_name,
asset_project,
asset_data,
config,
)
- if asset_data.get("deleted") is True:
- logger.info(
- f"Resource {asset_type} : {asset_name} has been deleted in GCP, unregistering from port"
- )
- await ocean.unregister_raw(asset_type, [asset_resource_data])
- else:
- logger.info(
- f"Registering creation/update of resource {asset_type} : {asset_name} in project {asset_project} in Port"
- )
- await ocean.register_raw(asset_type, [asset_resource_data])
+ logger.info(
+ f"Added background task to process real-time event for kind: {asset_type} with name: {asset_name} from project: {asset_project}"
+ )
except AssetHasNoProjectAncestorError:
logger.exception(
f"Couldn't find project ancestor to asset {asset_name}. Other types of ancestors and not supported yet."
)
except GotFeedCreatedSuccessfullyMessageError:
logger.info("Assets Feed created successfully")
- except Exception:
- logger.exception("Got error while handling a real time event")
+ except Exception as e:
+ logger.exception(f"Got error {str(e)} while handling a real time event")
return Response(status_code=http.HTTPStatus.INTERNAL_SERVER_ERROR)
return Response(status_code=200)
diff --git a/integrations/gcp/pyproject.toml b/integrations/gcp/pyproject.toml
index 41f08586a7..970c13df66 100644
--- a/integrations/gcp/pyproject.toml
+++ b/integrations/gcp/pyproject.toml
@@ -1,6 +1,6 @@
[tool.poetry]
name = "gcp"
-version = "0.1.97"
+version = "0.1.98"
description = "A GCP ocean integration"
authors = ["Matan Geva "]
diff --git a/integrations/gcp/tests/gcp_core/search/test_resource_searches.py b/integrations/gcp/tests/gcp_core/search/test_resource_searches.py
index a80c9ce70b..e815be5870 100644
--- a/integrations/gcp/tests/gcp_core/search/test_resource_searches.py
+++ b/integrations/gcp/tests/gcp_core/search/test_resource_searches.py
@@ -7,6 +7,9 @@
from port_ocean.core.ocean_types import ASYNC_GENERATOR_RESYNC_TYPE
from google.pubsub_v1.types import pubsub
from google.cloud.resourcemanager_v3.types import Project
+from gcp_core.overrides import (
+ ProtoConfig,
+)
async def mock_subscription_pages(
@@ -97,7 +100,10 @@ async def test_get_single_subscription(
# Act within event context
async with event_context("test_event"):
- actual_subscription = await get_single_subscription("subscription_name")
+ config = ProtoConfig(
+ preserving_proto_field_name=mock_resource_config.selector.preserve_api_response_case_style
+ )
+ actual_subscription = await get_single_subscription("subscription_name", config)
# Assert
assert actual_subscription == expected_subscription
@@ -106,7 +112,8 @@ async def test_get_single_subscription(
@pytest.mark.asyncio
@patch("gcp_core.utils.get_current_resource_config")
async def test_feed_to_resource(
- get_current_resource_config_mock: MagicMock, monkeypatch: Any
+ get_current_resource_config_mock: MagicMock,
+ monkeypatch: Any,
) -> None:
# Arrange
projects_async_client_mock = AsyncMock
@@ -136,6 +143,9 @@ async def test_feed_to_resource(
from gcp_core.search.resource_searches import feed_event_to_resource
+ # Mock resolve_request_controllers
+ mock_rate_limiter = AsyncMock()
+
mock_asset_name = "projects/project_name/topics/topic_name"
mock_asset_type = "pubsub.googleapis.com/Topic"
mock_asset_project_name = "project_name"
@@ -167,11 +177,16 @@ async def test_feed_to_resource(
# Act within event context
async with event_context("test_event"):
+ config = ProtoConfig(
+ preserving_proto_field_name=mock_resource_config.selector.preserve_api_response_case_style
+ )
actual_resource = await feed_event_to_resource(
asset_type=mock_asset_type,
asset_name=mock_asset_name,
+ project_rate_limiter=mock_rate_limiter,
project_id=mock_asset_project_name,
asset_data=mock_asset_data,
+ config=config,
)
# Assert
@@ -231,7 +246,12 @@ async def test_preserve_case_style_combined(
# Act within event context for preserve_case_style = True
async with event_context("test_event"):
- actual_subscription_true = await get_single_subscription("subscription_name")
+ config = ProtoConfig(
+ preserving_proto_field_name=mock_resource_config_true.selector.preserve_api_response_case_style
+ )
+ actual_subscription_true = await get_single_subscription(
+ "subscription_name", config
+ )
# Assert for preserve_case_style = True
assert actual_subscription_true == expected_subscription_true
@@ -258,7 +278,12 @@ async def test_preserve_case_style_combined(
# Act within event context for preserve_case_style = False
async with event_context("test_event"):
- actual_subscription_false = await get_single_subscription("subscription_name")
+ config = ProtoConfig(
+ preserving_proto_field_name=mock_resource_config_false.selector.preserve_api_response_case_style
+ )
+ actual_subscription_false = await get_single_subscription(
+ "subscription_name", config
+ )
# Assert for preserve_case_style = False
assert actual_subscription_false == expected_subscription_false