Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Integration][GCP] Added Rate Limiting for ProjectsV3GetRequestsPerMinutePerProject Quota #1304

Open
wants to merge 17 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions integrations/gcp/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,14 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

<!-- towncrier release notes start -->

## 0.1.90 (2025-01-09)


### Improvements

- Added rate limiting support for `ProjectsV3GetRequestsPerMinutePerProject` and `ProjectV3SearchRequestsPerMinutePerProject` to handle GCP project quota limits during resyncs and real-time event processing


## 0.1.89 (2025-01-08)


Expand Down
14 changes: 14 additions & 0 deletions integrations/gcp/gcp_core/helpers/ratelimiter/overrides.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@ class PubSubAPI(ResourceBoundedSemaphore):
service = "pubsub.googleapis.com"


class CloudResourceManagerAPI(ResourceBoundedSemaphore):
service = "cloudresourcemanager.googleapis.com"


class SearchAllResourcesQpmPerProject(CloudAssetAPI):
quota_id = "apiSearchAllResourcesQpmPerProject"
container_type = ContainerType.PROJECT
Expand All @@ -20,3 +24,13 @@ class SearchAllResourcesQpmPerProject(CloudAssetAPI):
class PubSubAdministratorPerMinutePerProject(PubSubAPI):
quota_id = "administratorPerMinutePerProject"
container_type = ContainerType.PROJECT


class ProjectGetRequestsPerMinutePerProject(CloudResourceManagerAPI):
quota_id = "ProjectV3GetRequestsPerMinutePerProject"
container_type = ContainerType.PROJECT


class ProjectSearchRequestsPerMinutePerProject(CloudResourceManagerAPI):
quota_id = "ProjectV3SearchRequestsPerMinutePerProject"
container_type = ContainerType.PROJECT
49 changes: 36 additions & 13 deletions integrations/gcp/gcp_core/search/resource_searches.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
from google.pubsub_v1.services.publisher import PublisherAsyncClient
from google.pubsub_v1.services.subscriber import SubscriberAsyncClient
from loguru import logger

from gcp_core.utils import resolve_request_controllers
from port_ocean.core.ocean_types import ASYNC_GENERATOR_RESYNC_TYPE, RAW_ITEM
from port_ocean.utils.cache import cache_iterator_result
from gcp_core.errors import ResourceNotFoundError
Expand Down Expand Up @@ -177,14 +179,15 @@ async def list_all_subscriptions_per_project(


@cache_iterator_result()
async def search_all_projects() -> ASYNC_GENERATOR_RESYNC_TYPE:
async def search_all_projects(**kwargs: Any) -> ASYNC_GENERATOR_RESYNC_TYPE:
oiadebayo marked this conversation as resolved.
Show resolved Hide resolved
logger.info("Searching projects")
async with ProjectsAsyncClient() as projects_client:
async for projects in paginated_query(
projects_client,
"search_projects",
{},
lambda response: parse_protobuf_messages(response.projects),
kwargs.get("rate_limiter"),
):
yield projects

Expand Down Expand Up @@ -214,15 +217,28 @@ async def search_all_organizations() -> ASYNC_GENERATOR_RESYNC_TYPE:


async def get_single_project(
project_name: str, config: Optional[ProtoConfig] = None
project_name: str, config: Optional[ProtoConfig] = None, **kwargs: Any
) -> RAW_ITEM:
async with ProjectsAsyncClient() as projects_client:
return parse_protobuf_message(
await projects_client.get_project(
name=project_name, timeout=DEFAULT_REQUEST_TIMEOUT
),
config,
)
rate_limiter = kwargs.get("rate_limiter")
if rate_limiter:
async with rate_limiter:
logger.info(
f"Executing get_single_project. Current rate limit: {rate_limiter.max_rate} requests per {rate_limiter.time_period} seconds."
)
return parse_protobuf_message(
await projects_client.get_project(
name=project_name, timeout=DEFAULT_REQUEST_TIMEOUT
),
config,
)
else:
return parse_protobuf_message(
await projects_client.get_project(
name=project_name, timeout=DEFAULT_REQUEST_TIMEOUT
),
config,
)


async def get_single_folder(
Expand Down Expand Up @@ -311,22 +327,27 @@ async def feed_event_to_resource(
config: Optional[ProtoConfig] = None,
) -> RAW_ITEM:
resource = None
live_event_projects_rate_limiter, _ = await resolve_request_controllers(
AssetTypesWithSpecialHandling.PROJECT, method="get"
oiadebayo marked this conversation as resolved.
Show resolved Hide resolved
)
if asset_data.get("deleted") is True:
resource = asset_data["priorAsset"]["resource"]["data"]
resource[EXTRA_PROJECT_FIELD] = await get_single_project(project_id, config)
resource[EXTRA_PROJECT_FIELD] = await get_single_project(
project_id, config, rate_limiter=live_event_projects_rate_limiter
)
else:
match asset_type:
case AssetTypesWithSpecialHandling.TOPIC:
topic_name = asset_name.replace("//pubsub.googleapis.com/", "")
resource = await get_single_topic(topic_name, config)
resource[EXTRA_PROJECT_FIELD] = await get_single_project(
project_id, config
project_id, config, rate_limiter=live_event_projects_rate_limiter
)
case AssetTypesWithSpecialHandling.SUBSCRIPTION:
topic_name = asset_name.replace("//pubsub.googleapis.com/", "")
resource = await get_single_subscription(topic_name, config)
resource[EXTRA_PROJECT_FIELD] = await get_single_project(
project_id, config
project_id, config, rate_limiter=live_event_projects_rate_limiter
)
case AssetTypesWithSpecialHandling.FOLDER:
folder_id = asset_name.replace(
Expand All @@ -339,10 +360,12 @@ async def feed_event_to_resource(
)
resource = await get_single_organization(organization_id, config)
case AssetTypesWithSpecialHandling.PROJECT:
resource = await get_single_project(project_id, config)
resource = await get_single_project(
project_id, config, rate_limiter=live_event_projects_rate_limiter
)
case _:
resource = asset_data["asset"]["resource"]["data"]
resource[EXTRA_PROJECT_FIELD] = await get_single_project(
project_id, config
project_id, config, rate_limiter=live_event_projects_rate_limiter
)
return resource
38 changes: 35 additions & 3 deletions integrations/gcp/gcp_core/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,16 @@
from gcp_core.helpers.ratelimiter.overrides import (
SearchAllResourcesQpmPerProject,
PubSubAdministratorPerMinutePerProject,
ProjectGetRequestsPerMinutePerProject,
ProjectSearchRequestsPerMinutePerProject,
)

search_all_resources_qpm_per_project = SearchAllResourcesQpmPerProject()
pubsub_administrator_per_minute_per_project = PubSubAdministratorPerMinutePerProject()
project_get_requests_per_minute_per_project = ProjectGetRequestsPerMinutePerProject()
project_search_requests_per_minute_per_project = (
ProjectSearchRequestsPerMinutePerProject()
)

EXTRA_PROJECT_FIELD = "__project"
DEFAULT_CREDENTIALS_FILE_PATH = (
Expand Down Expand Up @@ -181,10 +187,36 @@ def get_service_account_project_id() -> str:


async def get_quotas_for_project(
project_id: str, kind: str
project_id: str, kind: str, **kwargs: Any
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
project_id: str, kind: str, **kwargs: Any
project_id: str, kind: str, quota_id: Optional[str] = None

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets be more precise here

) -> Tuple["AsyncLimiter", "BoundedSemaphore"]:
try:
match kind:
case AssetTypesWithSpecialHandling.PROJECT:
method = kwargs.get("method")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
method = kwargs.get("method")

if method == "search":
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if method == "search":
if quota_id == "apiSearchAllResourcesQpmPerProject":

project_rate_limiter = (
await project_search_requests_per_minute_per_project.limiter(
project_id
)
)
project_semaphore = (
await project_search_requests_per_minute_per_project.semaphore(
project_id
)
)
return project_rate_limiter, project_semaphore
else:
project_rate_limiter = (
await project_get_requests_per_minute_per_project.limiter(
project_id
)
)
project_semaphore = (
await project_get_requests_per_minute_per_project.semaphore(
project_id
)
)
return project_rate_limiter, project_semaphore
case (
AssetTypesWithSpecialHandling.TOPIC
| AssetTypesWithSpecialHandling.SUBSCRIPTION
Expand Down Expand Up @@ -222,7 +254,7 @@ async def get_quotas_for_project(


async def resolve_request_controllers(
kind: str,
kind: str, **kwargs: Any
oiadebayo marked this conversation as resolved.
Show resolved Hide resolved
) -> Tuple["AsyncLimiter", "BoundedSemaphore"]:
service_account_project_id = get_service_account_project_id()
return await get_quotas_for_project(service_account_project_id, kind)
return await get_quotas_for_project(service_account_project_id, kind, **kwargs)
10 changes: 8 additions & 2 deletions integrations/gcp/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,10 @@ async def _resolve_resync_method_for_resource(
case AssetTypesWithSpecialHandling.ORGANIZATION:
return search_all_organizations()
case AssetTypesWithSpecialHandling.PROJECT:
return search_all_projects()
project_rate_limiter, _ = await resolve_request_controllers(
kind, method="search"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

adjust

)
return search_all_projects(rate_limiter=project_rate_limiter)
case _:
asset_rate_limiter, asset_semaphore = await resolve_request_controllers(
kind
Expand Down Expand Up @@ -106,7 +109,10 @@ async def resync_organizations(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE:

@ocean.on_resync(kind=AssetTypesWithSpecialHandling.PROJECT)
async def resync_projects(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE:
async for batch in search_all_projects():
resync_projects_rate_limiter, _ = await resolve_request_controllers(
kind, method="search"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

adjust

)
async for batch in search_all_projects(rate_limiter=resync_projects_rate_limiter):
yield batch


Expand Down
2 changes: 1 addition & 1 deletion integrations/gcp/pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "gcp"
version = "0.1.89"
version = "0.1.90"
description = "A GCP ocean integration"
authors = ["Matan Geva <[email protected]>"]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,9 +104,12 @@ async def test_get_single_subscription(


@pytest.mark.asyncio
@patch("gcp_core.utils.resolve_request_controllers")
@patch("gcp_core.utils.get_current_resource_config")
async def test_feed_to_resource(
get_current_resource_config_mock: MagicMock, monkeypatch: Any
get_current_resource_config_mock: MagicMock,
resolve_request_controllers_mock: AsyncMock,
monkeypatch: Any,
) -> None:
# Arrange
projects_async_client_mock = AsyncMock
Expand Down Expand Up @@ -136,6 +139,10 @@ async def test_feed_to_resource(

from gcp_core.search.resource_searches import feed_event_to_resource

# Mock resolve_request_controllers
mock_rate_limiter = AsyncMock()
resolve_request_controllers_mock.return_value = (mock_rate_limiter, None)

mock_asset_name = "projects/project_name/topics/topic_name"
mock_asset_type = "pubsub.googleapis.com/Topic"
mock_asset_project_name = "project_name"
Expand Down
Loading