Skip to content

Commit

Permalink
[Integration][GCP] Added Rate Limiting for ProjectsV3GetRequestsPerMi…
Browse files Browse the repository at this point in the history
…nutePerProject Quota (#1304)

# Description

**What**
- Implemented rate limiting for the
`ProjectsV3GetRequestsPerMinutePerProject` quota.
- Adjusted real-time event handling to respect the newly implemented
rate limits.

**Why**
- To address issues reported where real-time event handling exceeded GCP
quota limits, resulting in `429 Quota Exceeded` errors.
- Improve system reliability and data consistency by adhering to quota
limits.

**How**
- Introduced a rate limiting mechanism for the
`ProjectsV3GetRequestsPerMinutePerProject` quota using the existing rate
limiter
- Added support in the `resolve_request_controllers` method to fetch and
apply this specific quota.
- Integrated the rate limiter into real-time event processing methods,
particularly for project-related operations.


## Type of change

Please leave one option from the following and delete the rest:

- [x] Bug fix (non-breaking change which fixes an issue)
- [ ] New feature (non-breaking change which adds functionality)
- [ ] New Integration (non-breaking change which adds a new integration)
- [ ] Breaking change (fix or feature that would cause existing
functionality to not work as expected)
- [ ] Non-breaking change (fix of existing functionality that will not
change current behavior)
- [ ] Documentation (added/updated documentation)

<h4> All tests should be run against the port production
environment(using a testing org). </h4>

### Core testing checklist

- [ ] Integration able to create all default resources from scratch
- [ ] Resync finishes successfully
- [ ] Resync able to create entities
- [ ] Resync able to update entities
- [ ] Resync able to detect and delete entities
- [ ] Scheduled resync able to abort existing resync and start a new one
- [ ] Tested with at least 2 integrations from scratch
- [ ] Tested with Kafka and Polling event listeners
- [ ] Tested deletion of entities that don't pass the selector


### Integration testing checklist

- [ ] Integration able to create all default resources from scratch
- [ ] Resync able to create entities
- [ ] Resync able to update entities
- [ ] Resync able to detect and delete entities
- [ ] Resync finishes successfully
- [ ] If new resource kind is added or updated in the integration, add
example raw data, mapping and expected result to the `examples` folder
in the integration directory.
- [ ] If resource kind is updated, run the integration with the example
data and check if the expected result is achieved
- [ ] If new resource kind is added or updated, validate that
live-events for that resource are working as expected
- [ ] Docs PR link [here](#)

### Preflight checklist

- [ ] Handled rate limiting
- [ ] Handled pagination
- [ ] Implemented the code in async
- [ ] Support Multi account

## Screenshots

Include screenshots from your environment showing how the resources of
the integration will look.

## API Documentation

Provide links to the API documentation used for this integration.

---------

Co-authored-by: Michael Kofi Armah <[email protected]>
Co-authored-by: Matan <[email protected]>
  • Loading branch information
3 people authored Jan 27, 2025
1 parent dbff904 commit d9c5a70
Show file tree
Hide file tree
Showing 7 changed files with 165 additions and 46 deletions.
10 changes: 10 additions & 0 deletions integrations/gcp/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,16 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

<!-- towncrier release notes start -->

## 0.1.98 (2025-01-24)


### Improvements

- Added rate limiting support for `ProjectsV3GetRequestsPerMinutePerProject` to handle GCP project quota limits during real-time event processing.
- Implemented a shared `AsyncLimiter` instance to ensure consistent rate limiting across real-time events
- Moved real-time event processing to run in the background, preventing timeouts and ensuring smoother handling of rate-limited operations.


## 0.1.97 (2025-01-23)


Expand Down
9 changes: 9 additions & 0 deletions integrations/gcp/gcp_core/helpers/ratelimiter/overrides.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@ class PubSubAPI(ResourceBoundedSemaphore):
service = "pubsub.googleapis.com"


class CloudResourceManagerAPI(ResourceBoundedSemaphore):
service = "cloudresourcemanager.googleapis.com"


class SearchAllResourcesQpmPerProject(CloudAssetAPI):
quota_id = "apiSearchAllResourcesQpmPerProject"
container_type = ContainerType.PROJECT
Expand All @@ -20,3 +24,8 @@ class SearchAllResourcesQpmPerProject(CloudAssetAPI):
class PubSubAdministratorPerMinutePerProject(PubSubAPI):
quota_id = "administratorPerMinutePerProject"
container_type = ContainerType.PROJECT


class ProjectGetRequestsPerMinutePerProject(CloudResourceManagerAPI):
quota_id = "ProjectV3GetRequestsPerMinutePerProject"
container_type = ContainerType.PROJECT
50 changes: 30 additions & 20 deletions integrations/gcp/gcp_core/search/resource_searches.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Any, Optional
from typing import Any
import typing

from google.api_core.exceptions import NotFound, PermissionDenied
Expand All @@ -24,6 +24,7 @@
parse_protobuf_messages,
parse_latest_resource_from_asset,
)
from aiolimiter import AsyncLimiter
from gcp_core.search.paginated_query import paginated_query, DEFAULT_REQUEST_TIMEOUT
from gcp_core.helpers.ratelimiter.base import MAXIMUM_CONCURRENT_REQUESTS
from asyncio import BoundedSemaphore
Expand Down Expand Up @@ -214,20 +215,24 @@ async def search_all_organizations() -> ASYNC_GENERATOR_RESYNC_TYPE:


async def get_single_project(
project_name: str, config: Optional[ProtoConfig] = None
project_name: str,
rate_limiter: AsyncLimiter,
config: ProtoConfig,
) -> RAW_ITEM:
async with ProjectsAsyncClient() as projects_client:
return parse_protobuf_message(
await projects_client.get_project(
name=project_name, timeout=DEFAULT_REQUEST_TIMEOUT
),
config,
)
async with rate_limiter:
logger.debug(
f"Executing get_single_project. Current rate limit: {rate_limiter.max_rate} requests per {rate_limiter.time_period} seconds."
)
return parse_protobuf_message(
await projects_client.get_project(
name=project_name, timeout=DEFAULT_REQUEST_TIMEOUT
),
config,
)


async def get_single_folder(
folder_name: str, config: Optional[ProtoConfig] = None
) -> RAW_ITEM:
async def get_single_folder(folder_name: str, config: ProtoConfig) -> RAW_ITEM:
async with FoldersAsyncClient() as folders_client:
return parse_protobuf_message(
await folders_client.get_folder(
Expand All @@ -238,7 +243,7 @@ async def get_single_folder(


async def get_single_organization(
organization_name: str, config: Optional[ProtoConfig] = None
organization_name: str, config: ProtoConfig
) -> RAW_ITEM:
async with OrganizationsAsyncClient() as organizations_client:
return parse_protobuf_message(
Expand All @@ -251,7 +256,7 @@ async def get_single_organization(

async def get_single_topic(
topic_id: str,
config: Optional[ProtoConfig] = None,
config: ProtoConfig,
) -> RAW_ITEM:
"""
The Topics are handled specifically due to lacks of data in the asset itself within the asset inventory- e.g. some properties missing.
Expand All @@ -268,7 +273,7 @@ async def get_single_topic(

async def get_single_subscription(
subscription_id: str,
config: Optional[ProtoConfig] = None,
config: ProtoConfig,
) -> RAW_ITEM:
"""
Subscriptions are handled specifically due to lacks of data in the asset itself within the asset inventory- e.g. some properties missing.
Expand Down Expand Up @@ -308,25 +313,28 @@ async def feed_event_to_resource(
asset_name: str,
project_id: str,
asset_data: dict[str, Any],
config: Optional[ProtoConfig] = None,
project_rate_limiter: AsyncLimiter,
config: ProtoConfig,
) -> RAW_ITEM:
resource = None
if asset_data.get("deleted") is True:
resource = asset_data["priorAsset"]["resource"]["data"]
resource[EXTRA_PROJECT_FIELD] = await get_single_project(project_id, config)
resource[EXTRA_PROJECT_FIELD] = await get_single_project(
project_id, project_rate_limiter, config
)
else:
match asset_type:
case AssetTypesWithSpecialHandling.TOPIC:
topic_name = asset_name.replace("//pubsub.googleapis.com/", "")
resource = await get_single_topic(topic_name, config)
resource[EXTRA_PROJECT_FIELD] = await get_single_project(
project_id, config
project_id, project_rate_limiter, config
)
case AssetTypesWithSpecialHandling.SUBSCRIPTION:
topic_name = asset_name.replace("//pubsub.googleapis.com/", "")
resource = await get_single_subscription(topic_name, config)
resource[EXTRA_PROJECT_FIELD] = await get_single_project(
project_id, config
project_id, project_rate_limiter, config
)
case AssetTypesWithSpecialHandling.FOLDER:
folder_id = asset_name.replace(
Expand All @@ -339,10 +347,12 @@ async def feed_event_to_resource(
)
resource = await get_single_organization(organization_id, config)
case AssetTypesWithSpecialHandling.PROJECT:
resource = await get_single_project(project_id, config)
resource = await get_single_project(
project_id, project_rate_limiter, config
)
case _:
resource = asset_data["asset"]["resource"]["data"]
resource[EXTRA_PROJECT_FIELD] = await get_single_project(
project_id, config
project_id, project_rate_limiter, config
)
return resource
17 changes: 16 additions & 1 deletion integrations/gcp/gcp_core/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@
from gcp_core.helpers.ratelimiter.overrides import (
SearchAllResourcesQpmPerProject,
PubSubAdministratorPerMinutePerProject,
ProjectGetRequestsPerMinutePerProject,
)

search_all_resources_qpm_per_project = SearchAllResourcesQpmPerProject()
pubsub_administrator_per_minute_per_project = PubSubAdministratorPerMinutePerProject()
project_get_requests_per_minute_per_project = ProjectGetRequestsPerMinutePerProject()

EXTRA_PROJECT_FIELD = "__project"
DEFAULT_CREDENTIALS_FILE_PATH = (
Expand Down Expand Up @@ -181,10 +183,23 @@ def get_service_account_project_id() -> str:


async def get_quotas_for_project(
project_id: str, kind: str
project_id: str,
kind: str,
) -> Tuple["AsyncLimiter", "BoundedSemaphore"]:
try:
match kind:
case AssetTypesWithSpecialHandling.PROJECT:
project_rate_limiter = (
await project_get_requests_per_minute_per_project.limiter(
project_id
)
)
project_semaphore = (
await project_get_requests_per_minute_per_project.semaphore(
project_id
)
)
return project_rate_limiter, project_semaphore
case (
AssetTypesWithSpecialHandling.TOPIC
| AssetTypesWithSpecialHandling.SUBSCRIPTION
Expand Down
90 changes: 70 additions & 20 deletions integrations/gcp/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@
import tempfile
import typing

from fastapi import Request, Response
from aiolimiter import AsyncLimiter
from fastapi import Request, Response, BackgroundTasks
from loguru import logger

from port_ocean.context.ocean import ocean
Expand All @@ -17,7 +18,6 @@
from gcp_core.overrides import (
GCPCloudResourceSelector,
GCPPortAppConfig,
GCPResourceSelector,
ProtoConfig,
)
from port_ocean.context.event import event
Expand All @@ -38,6 +38,8 @@
resolve_request_controllers,
)

PROJECT_V3_GET_REQUESTS_RATE_LIMITER: AsyncLimiter


async def _resolve_resync_method_for_resource(
kind: str,
Expand Down Expand Up @@ -75,6 +77,15 @@ async def _resolve_resync_method_for_resource(
)


@ocean.on_start()
async def setup_real_time_request_controllers() -> None:
global PROJECT_V3_GET_REQUESTS_RATE_LIMITER
if not ocean.event_listener_type == "ONCE":
PROJECT_V3_GET_REQUESTS_RATE_LIMITER, _ = await resolve_request_controllers(
AssetTypesWithSpecialHandling.PROJECT
)


@ocean.on_start()
async def setup_application_default_credentials() -> None:
if not ocean.integration_config["encoded_adc_configuration"]:
Expand Down Expand Up @@ -163,8 +174,56 @@ async def resync_cloud_resources(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE:
yield resources_batch


async def process_realtime_event(
asset_type: str,
asset_name: str,
asset_project: str,
asset_data: dict[str, typing.Any],
config: ProtoConfig,
) -> None:
"""
This function runs in the background to ensure the real-time event endpoints
do not time out while waiting for rate-limited operations to complete. It is triggered
by the real-time events handler when a new event is received.
ROJECT_V3_GET_REQUESTS_RATE_LIMITER is provided as a static value instead of being dynamic because all real-time events
needs to share the same instance of the limiter and it had to be instantiated on start for this to be possible.
The dynamic initialization of the limiter will make it impossible to share the same instance across all event context.
"""
try:
logger.debug(
f"Processing real-time event for {asset_type} : {asset_name} in the background"
)
asset_resource_data = await feed_event_to_resource(
asset_type,
asset_name,
asset_project,
asset_data,
PROJECT_V3_GET_REQUESTS_RATE_LIMITER,
config,
)
if asset_data.get("deleted") is True:
logger.info(
f"Resource {asset_type} : {asset_name} has been deleted in GCP, unregistering from port"
)
await ocean.unregister_raw(asset_type, [asset_resource_data])
else:
logger.info(
f"Registering creation/update of resource {asset_type} : {asset_name} in project {asset_project} in Port"
)
await ocean.register_raw(asset_type, [asset_resource_data])
except AssetHasNoProjectAncestorError:
logger.exception(
f"Couldn't find project ancestor to asset {asset_name}. Other types of ancestors and not supported yet."
)
except Exception as e:
logger.exception(f"Got error {e} while processing a real time event")


@ocean.router.post("/events")
async def feed_events_callback(request: Request) -> Response:
async def feed_events_callback(
request: Request, background_tasks: BackgroundTasks
) -> Response:
"""
This is the real-time events handler. The subscription which is connected to the Feeds Topic will send events here once
the events are inserted into the Assets Inventory.
Expand Down Expand Up @@ -193,10 +252,7 @@ async def feed_events_callback(request: Request) -> Response:
matching_resource_configs = [
resource_config
for resource_config in resource_configs
if (
resource_config.kind == asset_type
and isinstance(resource_config.selector, GCPResourceSelector)
)
if (resource_config.kind == asset_type)
]
for matching_resource_config in matching_resource_configs:
selector = matching_resource_config.selector
Expand All @@ -205,30 +261,24 @@ async def feed_events_callback(request: Request) -> Response:
getattr(selector, "preserve_api_response_case_style", False)
)
)
asset_resource_data = await feed_event_to_resource(
background_tasks.add_task(
process_realtime_event,
asset_type,
asset_name,
asset_project,
asset_data,
config,
)
if asset_data.get("deleted") is True:
logger.info(
f"Resource {asset_type} : {asset_name} has been deleted in GCP, unregistering from port"
)
await ocean.unregister_raw(asset_type, [asset_resource_data])
else:
logger.info(
f"Registering creation/update of resource {asset_type} : {asset_name} in project {asset_project} in Port"
)
await ocean.register_raw(asset_type, [asset_resource_data])
logger.info(
f"Added background task to process real-time event for kind: {asset_type} with name: {asset_name} from project: {asset_project}"
)
except AssetHasNoProjectAncestorError:
logger.exception(
f"Couldn't find project ancestor to asset {asset_name}. Other types of ancestors and not supported yet."
)
except GotFeedCreatedSuccessfullyMessageError:
logger.info("Assets Feed created successfully")
except Exception:
logger.exception("Got error while handling a real time event")
except Exception as e:
logger.exception(f"Got error {str(e)} while handling a real time event")
return Response(status_code=http.HTTPStatus.INTERNAL_SERVER_ERROR)
return Response(status_code=200)
2 changes: 1 addition & 1 deletion integrations/gcp/pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "gcp"
version = "0.1.97"
version = "0.1.98"
description = "A GCP ocean integration"
authors = ["Matan Geva <[email protected]>"]

Expand Down
Loading

0 comments on commit d9c5a70

Please sign in to comment.