Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Integration][GCP] Added Rate Limiting for ProjectsV3GetRequestsPerMinutePerProject Quota #1304

Open
wants to merge 17 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions integrations/gcp/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,14 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

<!-- towncrier release notes start -->

## 0.1.93 (2025-01-16)


### Improvements

- Added rate limiting support for `ProjectsV3GetRequestsPerMinutePerProject` and `ProjectV3SearchRequestsPerMinutePerProject` to handle GCP project quota limits during resyncs and real-time event processing


## 0.1.92 (2025-01-16)


Expand Down
14 changes: 14 additions & 0 deletions integrations/gcp/gcp_core/helpers/ratelimiter/overrides.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@ class PubSubAPI(ResourceBoundedSemaphore):
service = "pubsub.googleapis.com"


class CloudResourceManagerAPI(ResourceBoundedSemaphore):
service = "cloudresourcemanager.googleapis.com"


class SearchAllResourcesQpmPerProject(CloudAssetAPI):
quota_id = "apiSearchAllResourcesQpmPerProject"
container_type = ContainerType.PROJECT
Expand All @@ -20,3 +24,13 @@ class SearchAllResourcesQpmPerProject(CloudAssetAPI):
class PubSubAdministratorPerMinutePerProject(PubSubAPI):
quota_id = "administratorPerMinutePerProject"
container_type = ContainerType.PROJECT


class ProjectGetRequestsPerMinutePerProject(CloudResourceManagerAPI):
quota_id = "ProjectV3GetRequestsPerMinutePerProject"
container_type = ContainerType.PROJECT


class ProjectSearchRequestsPerMinutePerProject(CloudResourceManagerAPI):
quota_id = "ProjectV3SearchRequestsPerMinutePerProject"
container_type = ContainerType.PROJECT
50 changes: 37 additions & 13 deletions integrations/gcp/gcp_core/search/resource_searches.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
from google.pubsub_v1.services.publisher import PublisherAsyncClient
from google.pubsub_v1.services.subscriber import SubscriberAsyncClient
from loguru import logger

from gcp_core.utils import resolve_request_controllers
from port_ocean.core.ocean_types import ASYNC_GENERATOR_RESYNC_TYPE, RAW_ITEM
from port_ocean.utils.cache import cache_iterator_result
from gcp_core.errors import ResourceNotFoundError
Expand Down Expand Up @@ -177,14 +179,15 @@ async def list_all_subscriptions_per_project(


@cache_iterator_result()
async def search_all_projects() -> ASYNC_GENERATOR_RESYNC_TYPE:
async def search_all_projects(**kwargs: Dict[str, Any]) -> ASYNC_GENERATOR_RESYNC_TYPE:
logger.info("Searching projects")
async with ProjectsAsyncClient() as projects_client:
async for projects in paginated_query(
projects_client,
"search_projects",
{},
lambda response: parse_protobuf_messages(response.projects),
kwargs.get("rate_limiter"),
):
yield projects

Expand Down Expand Up @@ -214,15 +217,28 @@ async def search_all_organizations() -> ASYNC_GENERATOR_RESYNC_TYPE:


async def get_single_project(
project_name: str, config: Optional[ProtoConfig] = None
project_name: str, config: Optional[ProtoConfig] = None, **kwargs: Any
) -> RAW_ITEM:
async with ProjectsAsyncClient() as projects_client:
return parse_protobuf_message(
await projects_client.get_project(
name=project_name, timeout=DEFAULT_REQUEST_TIMEOUT
),
config,
)
rate_limiter = kwargs.get("rate_limiter")
if rate_limiter:
async with rate_limiter:
logger.info(
f"Executing get_single_project. Current rate limit: {rate_limiter.max_rate} requests per {rate_limiter.time_period} seconds."
)
return parse_protobuf_message(
await projects_client.get_project(
name=project_name, timeout=DEFAULT_REQUEST_TIMEOUT
),
config,
)
else:
return parse_protobuf_message(
await projects_client.get_project(
name=project_name, timeout=DEFAULT_REQUEST_TIMEOUT
),
config,
)


async def get_single_folder(
Expand Down Expand Up @@ -311,22 +327,28 @@ async def feed_event_to_resource(
config: Optional[ProtoConfig] = None,
) -> RAW_ITEM:
resource = None
live_event_projects_rate_limiter, _ = await resolve_request_controllers(
AssetTypesWithSpecialHandling.PROJECT,
quota_id="ProjectV3GetRequestsPerMinutePerProject",
)
if asset_data.get("deleted") is True:
resource = asset_data["priorAsset"]["resource"]["data"]
resource[EXTRA_PROJECT_FIELD] = await get_single_project(project_id, config)
resource[EXTRA_PROJECT_FIELD] = await get_single_project(
project_id, config, rate_limiter=live_event_projects_rate_limiter
)
else:
match asset_type:
case AssetTypesWithSpecialHandling.TOPIC:
topic_name = asset_name.replace("//pubsub.googleapis.com/", "")
resource = await get_single_topic(topic_name, config)
resource[EXTRA_PROJECT_FIELD] = await get_single_project(
project_id, config
project_id, config, rate_limiter=live_event_projects_rate_limiter
)
case AssetTypesWithSpecialHandling.SUBSCRIPTION:
topic_name = asset_name.replace("//pubsub.googleapis.com/", "")
resource = await get_single_subscription(topic_name, config)
resource[EXTRA_PROJECT_FIELD] = await get_single_project(
project_id, config
project_id, config, rate_limiter=live_event_projects_rate_limiter
)
case AssetTypesWithSpecialHandling.FOLDER:
folder_id = asset_name.replace(
Expand All @@ -339,10 +361,12 @@ async def feed_event_to_resource(
)
resource = await get_single_organization(organization_id, config)
case AssetTypesWithSpecialHandling.PROJECT:
resource = await get_single_project(project_id, config)
resource = await get_single_project(
project_id, config, rate_limiter=live_event_projects_rate_limiter
)
case _:
resource = asset_data["asset"]["resource"]["data"]
resource[EXTRA_PROJECT_FIELD] = await get_single_project(
project_id, config
project_id, config, rate_limiter=live_event_projects_rate_limiter
)
return resource
39 changes: 36 additions & 3 deletions integrations/gcp/gcp_core/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,16 @@
from gcp_core.helpers.ratelimiter.overrides import (
SearchAllResourcesQpmPerProject,
PubSubAdministratorPerMinutePerProject,
ProjectGetRequestsPerMinutePerProject,
ProjectSearchRequestsPerMinutePerProject,
)

search_all_resources_qpm_per_project = SearchAllResourcesQpmPerProject()
pubsub_administrator_per_minute_per_project = PubSubAdministratorPerMinutePerProject()
project_get_requests_per_minute_per_project = ProjectGetRequestsPerMinutePerProject()
project_search_requests_per_minute_per_project = (
ProjectSearchRequestsPerMinutePerProject()
)

EXTRA_PROJECT_FIELD = "__project"
DEFAULT_CREDENTIALS_FILE_PATH = (
Expand Down Expand Up @@ -181,10 +187,35 @@ def get_service_account_project_id() -> str:


async def get_quotas_for_project(
project_id: str, kind: str
project_id: str, kind: str, quota_id: Optional[str] = None
) -> Tuple["AsyncLimiter", "BoundedSemaphore"]:
try:
match kind:
case AssetTypesWithSpecialHandling.PROJECT:
if quota_id == "ProjectV3SearchRequestsPerMinutePerProject":
project_rate_limiter = (
await project_search_requests_per_minute_per_project.limiter(
project_id
)
)
project_semaphore = (
await project_search_requests_per_minute_per_project.semaphore(
project_id
)
)
return project_rate_limiter, project_semaphore
else:
project_rate_limiter = (
await project_get_requests_per_minute_per_project.limiter(
project_id
)
)
project_semaphore = (
await project_get_requests_per_minute_per_project.semaphore(
project_id
)
)
return project_rate_limiter, project_semaphore
case (
AssetTypesWithSpecialHandling.TOPIC
| AssetTypesWithSpecialHandling.SUBSCRIPTION
Expand Down Expand Up @@ -222,7 +253,9 @@ async def get_quotas_for_project(


async def resolve_request_controllers(
kind: str,
kind: str, **kwargs: Dict[str, Any]
) -> Tuple["AsyncLimiter", "BoundedSemaphore"]:
service_account_project_id = get_service_account_project_id()
return await get_quotas_for_project(service_account_project_id, kind)
return await get_quotas_for_project(
service_account_project_id, kind, quota_id=kwargs.get("quota_id")
)
10 changes: 8 additions & 2 deletions integrations/gcp/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,10 @@ async def _resolve_resync_method_for_resource(
case AssetTypesWithSpecialHandling.ORGANIZATION:
return search_all_organizations()
case AssetTypesWithSpecialHandling.PROJECT:
return search_all_projects()
project_rate_limiter, _ = await resolve_request_controllers(
kind, quota_id="ProjectV3SearchRequestsPerMinutePerProject"
)
return search_all_projects(rate_limiter=project_rate_limiter)
case _:
asset_rate_limiter, asset_semaphore = await resolve_request_controllers(
kind
Expand Down Expand Up @@ -106,7 +109,10 @@ async def resync_organizations(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE:

@ocean.on_resync(kind=AssetTypesWithSpecialHandling.PROJECT)
async def resync_projects(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE:
async for batch in search_all_projects():
resync_projects_rate_limiter, _ = await resolve_request_controllers(
kind, quota_id="ProjectV3SearchRequestsPerMinutePerProject"
)
async for batch in search_all_projects(rate_limiter=resync_projects_rate_limiter):
yield batch


Expand Down
2 changes: 1 addition & 1 deletion integrations/gcp/pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "gcp"
version = "0.1.92"
version = "0.1.93"
description = "A GCP ocean integration"
authors = ["Matan Geva <[email protected]>"]

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Any, Generator

Check failure on line 1 in integrations/gcp/tests/gcp_core/search/test_resource_searches.py

View workflow job for this annotation

GitHub Actions / JUnit Test Report

test_resource_searches.test_get_single_subscription

NameError: name 'Dict' is not defined
Raw output
self = <Coroutine test_get_single_subscription>

    def runtest(self) -> None:
        self.obj = wrap_in_sync(
            # https://github.com/pytest-dev/pytest-asyncio/issues/596
            self.obj,  # type: ignore[has-type]
        )
>       super().runtest()

.venv/lib/python3.12/site-packages/pytest_asyncio/plugin.py:457: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
.venv/lib/python3.12/site-packages/pytest_asyncio/plugin.py:929: in inner
    _loop.run_until_complete(task)
/opt/hostedtoolcache/Python/3.12.8/x64/lib/python3.12/asyncio/base_events.py:686: in run_until_complete
    return future.result()
/opt/hostedtoolcache/Python/3.12.8/x64/lib/python3.12/unittest/mock.py:1410: in patched
    with self.decoration_helper(patched,
/opt/hostedtoolcache/Python/3.12.8/x64/lib/python3.12/contextlib.py:137: in __enter__
    return next(self.gen)
/opt/hostedtoolcache/Python/3.12.8/x64/lib/python3.12/unittest/mock.py:1375: in decoration_helper
    arg = exit_stack.enter_context(patching)
/opt/hostedtoolcache/Python/3.12.8/x64/lib/python3.12/contextlib.py:526: in enter_context
    result = _enter(cm)
/opt/hostedtoolcache/Python/3.12.8/x64/lib/python3.12/unittest/mock.py:1451: in __enter__
    self.target = self.getter()
/opt/hostedtoolcache/Python/3.12.8/x64/lib/python3.12/pkgutil.py:518: in resolve_name
    mod = importlib.import_module(s)
/opt/hostedtoolcache/Python/3.12.8/x64/lib/python3.12/importlib/__init__.py:90: in import_module
    return _bootstrap._gcd_import(name[level:], package, level)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

    import enum
    import base64
    import os
    import typing
    from collections.abc import MutableSequence
    from typing import Any, TypedDict, Tuple, Optional
    from gcp_core.errors import ResourceNotFoundError
    from loguru import logger
    import proto  # type: ignore
    from port_ocean.context.event import event
    from port_ocean.core.handlers.port_app_config.models import ResourceConfig
    
    from gcp_core.overrides import GCPCloudResourceConfig, ProtoConfig
    from port_ocean.context.ocean import ocean
    import json
    from pathlib import Path
    from gcp_core.helpers.ratelimiter.overrides import (
        SearchAllResourcesQpmPerProject,
        PubSubAdministratorPerMinutePerProject,
        ProjectGetRequestsPerMinutePerProject,
        ProjectSearchRequestsPerMinutePerProject,
    )
    
    search_all_resources_qpm_per_project = SearchAllResourcesQpmPerProject()
    pubsub_administrator_per_minute_per_project = PubSubAdministratorPerMinutePerProject()
    project_get_requests_per_minute_per_project = ProjectGetRequestsPerMinutePerProject()
    project_search_requests_per_minute_per_project = (
        ProjectSearchRequestsPerMinutePerProject()
    )
    
    EXTRA_PROJECT_FIELD = "__project"
    DEFAULT_CREDENTIALS_FILE_PATH = (
        f"{Path.home()}/.config/gcloud/application_default_credentials.json"
    )
    
    if typing.TYPE_CHECKING:
        from aiolimiter import AsyncLimiter
        from asyncio import BoundedSemaphore
    
    
    class VersionedResource(TypedDict):
        version: int
        resource: dict[Any, Any]
    
    
    class AssetData(TypedDict):
        versioned_resources: list[VersionedResource]
    
    
    def parse_latest_resource_from_asset(asset_data: AssetData) -> dict[Any, Any]:
        """
        Parse the latest version of a resource from asset data.
    
        Attempts to find the versioned resources using either snake_case or camelCase key,
        as the input format depends on how the asset data was originally serialized.
    
        Args:
            asset_data: Asset data containing versioned resources
    
        Returns:
            dict: The most recent version of the resource
    
        Raises:
            ResourceNotFoundError: If neither versioned_resources nor versionedResources is found
        """
        # Try both key formats since we don't control the input format
        versioned_resources = asset_data.get("versioned_resources") or asset_data.get(
            "versionedResources"
        )
        if not isinstance(versioned_resources, list):
            raise ResourceNotFoundError(
                "Could not find versioned resources under either 'versioned_resources' or 'versionedResources'. "
                "Please ensure the asset data contains a list of versioned resources in the expected format."
            )
    
        # Ensure each item in the list is a VersionedResource
        versioned_resources = typing.cast(list[VersionedResource], versioned_resources)
    
        max_versioned_resource_data = max(versioned_resources, key=lambda x: x["version"])
        return max_versioned_resource_data["resource"]
    
    
    def should_use_snake_case() -> bool:
        """
        Determines whether to use snake_case for field names based on preserve_api_response_case_style config.
    
        Returns:
            bool: True to use snake_case, False to preserve API's original case style
        """
    
        selector = get_current_resource_config().selector
        preserve_api_case = getattr(selector, "preserve_api_response_case_style", False)
        return not preserve_api_case
    
    
    def parse_protobuf_message(
        message: proto.Message,
        config: Optional[ProtoConfig] = None,
    ) -> dict[str, Any]:
        """
        Parse protobuf message to dict, controlling field name case style.
        """
        if config and config.preserving_proto_field_name is not None:
            use_snake_case = not config.preserving_proto_field_name
            return proto.Message.to_dict(
                message, preserving_proto_field_name=use_snake_case
            )
        use_snake_case = should_use_snake_case()
        return proto.Message.to_dict(message, preserving_proto_field_name=use_snake_case)
    
    
    def parse_protobuf_messages(
        messages: MutableSequence[proto.Message],
    ) -> list[dict[str, Any]]:
        return [parse_protobuf_message(message) for message in messages]
    
    
    class AssetTypesWithSpecialHandling(enum.StrEnum):
        TOPIC = "pubsub.googleapis.com/Topic"
        SUBSCRIPTION = "pubsub.googleapis.com/Subscription"
        PROJECT = "cloudresourcemanager.googleapis.com/Project"
        ORGANIZATION = "cloudresourcemanager.googleapis.com/Organization"
        FOLDER = "cloudresourcemanager.googleapis.com/Folder"
        CLOUD_RESOURCE = "cloudResource"
    
    
    def get_current_resource_config() -> (
        typing.Union[ResourceConfig, GCPCloudResourceConfig]
    ):
        """
        Returns the current resource config, accessible only inside an event context
        """
        return typing.cast(
            typing.Union[ResourceConfig, GCPCloudResourceConfig], event.resource_config
        )
    
    
    def get_credentials_json() -> str:
        credentials_json = ""
        if ocean.integration_config.get("encoded_adc_configuration"):
            b64_credentials = ocean.integration_config["encoded_adc_configuration"]
            credentials_json = base64.b64decode(b64_credentials).decode("utf-8")
        else:
            try:
                file_path: str = (
                    os.environ.get("GOOGLE_APPLICATION_CREDENTIALS")
                    or DEFAULT_CREDENTIALS_FILE_PATH
                )
                with open(file_path, "r", encoding="utf-8") as file:
                    credentials_json = file.read()
            except FileNotFoundError as e:
                raise FileNotFoundError(
                    f"Couldn't find the google credentials file. Please set the GOOGLE_APPLICATION_CREDENTIALS environment variable properly. Error: {str(e)}"
                )
        return credentials_json
    
    
    def get_service_account_project_id() -> str:
        "get project id associated with service account"
        try:
            default_credentials = json.loads(get_credentials_json())
            project_id = default_credentials.get("project_id") or default_credentials.get(
                "quota_project_id"
            )
    
            if not project_id:
                raise KeyError("project_id or quota_project_id")
    
            return project_id
        except FileNotFoundError as e:
            gcp_project_env = os.getenv("GCP_PROJECT")
            if isinstance(gcp_project_env, str):
                return gcp_project_env
            else:
                raise ValueError(
                    f"Couldn't figure out the service account's project id. You can specify it using the GCP_PROJECT environment variable. Error: {str(e)}"
                )
        except KeyError as e:
            raise ValueError(
                f"Couldn't figure out the service account's project id. Key: {str(e)} doesn't exist in the credentials file."
            )
        except Exception as e:
            raise ValueError(
                f"Couldn't figure out the service account's project id. Error: {str(e)}"
            )
        raise ValueError("Couldn't figure out the service account's project id.")
    
    
    async def get_quotas_for_project(
        project_id: str, kind: str, quota_id: Optional[str] = None
    ) -> Tuple["AsyncLimiter", "BoundedSemaphore"]:
        try:
            match kind:
                case AssetTypesWithSpecialHandling.PROJECT:
                    if quota_id == "ProjectV3SearchRequestsPerMinutePerProject":
                        project_rate_limiter = (
                            await project_search_requests_per_minute_per_project.limiter(
                                project_id
                            )
                        )
                        project_semaphore = (
                            await project_search_requests_per_minute_per_project.semaphore(
                                project_id
                            )
                        )
                        return project_rate_limiter, project_semaphore
                    else:
                        project_rate_limiter = (
                            await project_get_requests_per_minute_per_project.limiter(
                                project_id
                            )
                        )
                        project_semaphore = (
                            await project_get_requests_per_minute_per_project.semaphore(
                                project_id
                            )
                        )
                        return project_rate_limiter, project_semaphore
                case (
                    AssetTypesWithSpecialHandling.TOPIC
                    | AssetTypesWithSpecialHandling.SUBSCRIPTION
                ):
                    topic_rate_limiter = (
                        await pubsub_administrator_per_minute_per_project.limiter(
                            project_id
                        )
                    )
                    topic_semaphore = (
                        await pubsub_administrator_per_minute_per_project.semaphore(
                            project_id
                        )
                    )
                    return (topic_rate_limiter, topic_semaphore)
                case _:
                    asset_rate_limiter = await search_all_resources_qpm_per_project.limiter(
                        project_id
                    )
                    asset_semaphore = await search_all_resources_qpm_per_project.semaphore(
                        project_id
                    )
                    return (asset_rate_limiter, asset_semaphore)
        except Exception as e:
            logger.warning(
                f"Failed to compute quota dynamically due to error. Will use default values. Error: {str(e)}"
            )
            default_rate_limiter = (
                await search_all_resources_qpm_per_project.default_rate_limiter()
            )
            default_semaphore = (
                await search_all_resources_qpm_per_project.default_semaphore()
            )
            return (default_rate_limiter, default_semaphore)
    
    
    async def resolve_request_controllers(
>       kind: str, **kwargs: Dict[str, Any]
    ) -> Tuple["AsyncLimiter", "BoundedSemaphore"]:
E   NameError: name 'Dict' is not defined

gcp_core/utils.py:256: NameError

Check failure on line 1 in integrations/gcp/tests/gcp_core/search/test_resource_searches.py

View workflow job for this annotation

GitHub Actions / JUnit Test Report

test_resource_searches.test_feed_to_resource

NameError: name 'Dict' is not defined
Raw output
self = <Coroutine test_feed_to_resource>

    def runtest(self) -> None:
        self.obj = wrap_in_sync(
            # https://github.com/pytest-dev/pytest-asyncio/issues/596
            self.obj,  # type: ignore[has-type]
        )
>       super().runtest()

.venv/lib/python3.12/site-packages/pytest_asyncio/plugin.py:457: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
.venv/lib/python3.12/site-packages/pytest_asyncio/plugin.py:929: in inner
    _loop.run_until_complete(task)
/opt/hostedtoolcache/Python/3.12.8/x64/lib/python3.12/asyncio/base_events.py:686: in run_until_complete
    return future.result()
/opt/hostedtoolcache/Python/3.12.8/x64/lib/python3.12/unittest/mock.py:1410: in patched
    with self.decoration_helper(patched,
/opt/hostedtoolcache/Python/3.12.8/x64/lib/python3.12/contextlib.py:137: in __enter__
    return next(self.gen)
/opt/hostedtoolcache/Python/3.12.8/x64/lib/python3.12/unittest/mock.py:1375: in decoration_helper
    arg = exit_stack.enter_context(patching)
/opt/hostedtoolcache/Python/3.12.8/x64/lib/python3.12/contextlib.py:526: in enter_context
    result = _enter(cm)
/opt/hostedtoolcache/Python/3.12.8/x64/lib/python3.12/unittest/mock.py:1451: in __enter__
    self.target = self.getter()
/opt/hostedtoolcache/Python/3.12.8/x64/lib/python3.12/pkgutil.py:518: in resolve_name
    mod = importlib.import_module(s)
/opt/hostedtoolcache/Python/3.12.8/x64/lib/python3.12/importlib/__init__.py:90: in import_module
    return _bootstrap._gcd_import(name[level:], package, level)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

    import enum
    import base64
    import os
    import typing
    from collections.abc import MutableSequence
    from typing import Any, TypedDict, Tuple, Optional
    from gcp_core.errors import ResourceNotFoundError
    from loguru import logger
    import proto  # type: ignore
    from port_ocean.context.event import event
    from port_ocean.core.handlers.port_app_config.models import ResourceConfig
    
    from gcp_core.overrides import GCPCloudResourceConfig, ProtoConfig
    from port_ocean.context.ocean import ocean
    import json
    from pathlib import Path
    from gcp_core.helpers.ratelimiter.overrides import (
        SearchAllResourcesQpmPerProject,
        PubSubAdministratorPerMinutePerProject,
        ProjectGetRequestsPerMinutePerProject,
        ProjectSearchRequestsPerMinutePerProject,
    )
    
    search_all_resources_qpm_per_project = SearchAllResourcesQpmPerProject()
    pubsub_administrator_per_minute_per_project = PubSubAdministratorPerMinutePerProject()
    project_get_requests_per_minute_per_project = ProjectGetRequestsPerMinutePerProject()
    project_search_requests_per_minute_per_project = (
        ProjectSearchRequestsPerMinutePerProject()
    )
    
    EXTRA_PROJECT_FIELD = "__project"
    DEFAULT_CREDENTIALS_FILE_PATH = (
        f"{Path.home()}/.config/gcloud/application_default_credentials.json"
    )
    
    if typing.TYPE_CHECKING:
        from aiolimiter import AsyncLimiter
        from asyncio import BoundedSemaphore
    
    
    class VersionedResource(TypedDict):
        version: int
        resource: dict[Any, Any]
    
    
    class AssetData(TypedDict):
        versioned_resources: list[VersionedResource]
    
    
    def parse_latest_resource_from_asset(asset_data: AssetData) -> dict[Any, Any]:
        """
        Parse the latest version of a resource from asset data.
    
        Attempts to find the versioned resources using either snake_case or camelCase key,
        as the input format depends on how the asset data was originally serialized.
    
        Args:
            asset_data: Asset data containing versioned resources
    
        Returns:
            dict: The most recent version of the resource
    
        Raises:
            ResourceNotFoundError: If neither versioned_resources nor versionedResources is found
        """
        # Try both key formats since we don't control the input format
        versioned_resources = asset_data.get("versioned_resources") or asset_data.get(
            "versionedResources"
        )
        if not isinstance(versioned_resources, list):
            raise ResourceNotFoundError(
                "Could not find versioned resources under either 'versioned_resources' or 'versionedResources'. "
                "Please ensure the asset data contains a list of versioned resources in the expected format."
            )
    
        # Ensure each item in the list is a VersionedResource
        versioned_resources = typing.cast(list[VersionedResource], versioned_resources)
    
        max_versioned_resource_data = max(versioned_resources, key=lambda x: x["version"])
        return max_versioned_resource_data["resource"]
    
    
    def should_use_snake_case() -> bool:
        """
        Determines whether to use snake_case for field names based on preserve_api_response_case_style config.
    
        Returns:
            bool: True to use snake_case, False to preserve API's original case style
        """
    
        selector = get_current_resource_config().selector
        preserve_api_case = getattr(selector, "preserve_api_response_case_style", False)
        return not preserve_api_case
    
    
    def parse_protobuf_message(
        message: proto.Message,
        config: Optional[ProtoConfig] = None,
    ) -> dict[str, Any]:
        """
        Parse protobuf message to dict, controlling field name case style.
        """
        if config and config.preserving_proto_field_name is not None:
            use_snake_case = not config.preserving_proto_field_name
            return proto.Message.to_dict(
                message, preserving_proto_field_name=use_snake_case
            )
        use_snake_case = should_use_snake_case()
        return proto.Message.to_dict(message, preserving_proto_field_name=use_snake_case)
    
    
    def parse_protobuf_messages(
        messages: MutableSequence[proto.Message],
    ) -> list[dict[str, Any]]:
        return [parse_protobuf_message(message) for message in messages]
    
    
    class AssetTypesWithSpecialHandling(enum.StrEnum):
        TOPIC = "pubsub.googleapis.com/Topic"
        SUBSCRIPTION = "pubsub.googleapis.com/Subscription"
        PROJECT = "cloudresourcemanager.googleapis.com/Project"
        ORGANIZATION = "cloudresourcemanager.googleapis.com/Organization"
        FOLDER = "cloudresourcemanager.googleapis.com/Folder"
        CLOUD_RESOURCE = "cloudResource"
    
    
    def get_current_resource_config() -> (
        typing.Union[ResourceConfig, GCPCloudResourceConfig]
    ):
        """
        Returns the current resource config, accessible only inside an event context
        """
        return typing.cast(
            typing.Union[ResourceConfig, GCPCloudResourceConfig], event.resource_config
        )
    
    
    def get_credentials_json() -> str:
        credentials_json = ""
        if ocean.integration_config.get("encoded_adc_configuration"):
            b64_credentials = ocean.integration_config["encoded_adc_configuration"]
            credentials_json = base64.b64decode(b64_credentials).decode("utf-8")
        else:
            try:
                file_path: str = (
                    os.environ.get("GOOGLE_APPLICATION_CREDENTIALS")
                    or DEFAULT_CREDENTIALS_FILE_PATH
                )
                with open(file_path, "r", encoding="utf-8") as file:
                    credentials_json = file.read()
            except FileNotFoundError as e:
                raise FileNotFoundError(
                    f"Couldn't find the google credentials file. Please set the GOOGLE_APPLICATION_CREDENTIALS environment variable properly. Error: {str(e)}"
                )
        return credentials_json
    
    
    def get_service_account_project_id() -> str:
        "get project id associated with service account"
        try:
            default_credentials = json.loads(get_credentials_json())
            project_id = default_credentials.get("project_id") or default_credentials.get(
                "quota_project_id"
            )
    
            if not project_id:
                raise KeyError("project_id or quota_project_id")
    
            return project_id
        except FileNotFoundError as e:
            gcp_project_env = os.getenv("GCP_PROJECT")
            if isinstance(gcp_project_env, str):
                return gcp_project_env
            else:
                raise ValueError(
                    f"Couldn't figure out the service account's project id. You can specify it using the GCP_PROJECT environment variable. Error: {str(e)}"
                )
        except KeyError as e:
            raise ValueError(
                f"Couldn't figure out the service account's project id. Key: {str(e)} doesn't exist in the credentials file."
            )
        except Exception as e:
            raise ValueError(
                f"Couldn't figure out the service account's project id. Error: {str(e)}"
            )
        raise ValueError("Couldn't figure out the service account's project id.")
    
    
    async def get_quotas_for_project(
        project_id: str, kind: str, quota_id: Optional[str] = None
    ) -> Tuple["AsyncLimiter", "BoundedSemaphore"]:
        try:
            match kind:
                case AssetTypesWithSpecialHandling.PROJECT:
                    if quota_id == "ProjectV3SearchRequestsPerMinutePerProject":
                        project_rate_limiter = (
                            await project_search_requests_per_minute_per_project.limiter(
                                project_id
                            )
                        )
                        project_semaphore = (
                            await project_search_requests_per_minute_per_project.semaphore(
                                project_id
                            )
                        )
                        return project_rate_limiter, project_semaphore
                    else:
                        project_rate_limiter = (
                            await project_get_requests_per_minute_per_project.limiter(
                                project_id
                            )
                        )
                        project_semaphore = (
                            await project_get_requests_per_minute_per_project.semaphore(
                                project_id
                            )
                        )
                        return project_rate_limiter, project_semaphore
                case (
                    AssetTypesWithSpecialHandling.TOPIC
                    | AssetTypesWithSpecialHandling.SUBSCRIPTION
                ):
                    topic_rate_limiter = (
                        await pubsub_administrator_per_minute_per_project.limiter(
                            project_id
                        )
                    )
                    topic_semaphore = (
                        await pubsub_administrator_per_minute_per_project.semaphore(
                            project_id
                        )
                    )
                    return (topic_rate_limiter, topic_semaphore)
                case _:
                    asset_rate_limiter = await search_all_resources_qpm_per_project.limiter(
                        project_id
                    )
                    asset_semaphore = await search_all_resources_qpm_per_project.semaphore(
                        project_id
                    )
                    return (asset_rate_limiter, asset_semaphore)
        except Exception as e:
            logger.warning(
                f"Failed to compute quota dynamically due to error. Will use default values. Error: {str(e)}"
            )
            default_rate_limiter = (
                await search_all_resources_qpm_per_project.default_rate_limiter()
            )
            default_semaphore = (
                await search_all_resources_qpm_per_project.default_semaphore()
            )
            return (default_rate_limiter, default_semaphore)
    
    
    async def resolve_request_controllers(
>       kind: str, **kwargs: Dict[str, Any]
    ) -> Tuple["AsyncLimiter", "BoundedSemaphore"]:
E   NameError: name 'Dict' is not defined

gcp_core/utils.py:256: NameError

Check failure on line 1 in integrations/gcp/tests/gcp_core/search/test_resource_searches.py

View workflow job for this annotation

GitHub Actions / JUnit Test Report

test_resource_searches.test_preserve_case_style_combined

NameError: name 'Dict' is not defined
Raw output
self = <Coroutine test_preserve_case_style_combined>

    def runtest(self) -> None:
        self.obj = wrap_in_sync(
            # https://github.com/pytest-dev/pytest-asyncio/issues/596
            self.obj,  # type: ignore[has-type]
        )
>       super().runtest()

.venv/lib/python3.12/site-packages/pytest_asyncio/plugin.py:457: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
.venv/lib/python3.12/site-packages/pytest_asyncio/plugin.py:929: in inner
    _loop.run_until_complete(task)
/opt/hostedtoolcache/Python/3.12.8/x64/lib/python3.12/asyncio/base_events.py:686: in run_until_complete
    return future.result()
/opt/hostedtoolcache/Python/3.12.8/x64/lib/python3.12/unittest/mock.py:1410: in patched
    with self.decoration_helper(patched,
/opt/hostedtoolcache/Python/3.12.8/x64/lib/python3.12/contextlib.py:137: in __enter__
    return next(self.gen)
/opt/hostedtoolcache/Python/3.12.8/x64/lib/python3.12/unittest/mock.py:1375: in decoration_helper
    arg = exit_stack.enter_context(patching)
/opt/hostedtoolcache/Python/3.12.8/x64/lib/python3.12/contextlib.py:526: in enter_context
    result = _enter(cm)
/opt/hostedtoolcache/Python/3.12.8/x64/lib/python3.12/unittest/mock.py:1451: in __enter__
    self.target = self.getter()
/opt/hostedtoolcache/Python/3.12.8/x64/lib/python3.12/pkgutil.py:518: in resolve_name
    mod = importlib.import_module(s)
/opt/hostedtoolcache/Python/3.12.8/x64/lib/python3.12/importlib/__init__.py:90: in import_module
    return _bootstrap._gcd_import(name[level:], package, level)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

    import enum
    import base64
    import os
    import typing
    from collections.abc import MutableSequence
    from typing import Any, TypedDict, Tuple, Optional
    from gcp_core.errors import ResourceNotFoundError
    from loguru import logger
    import proto  # type: ignore
    from port_ocean.context.event import event
    from port_ocean.core.handlers.port_app_config.models import ResourceConfig
    
    from gcp_core.overrides import GCPCloudResourceConfig, ProtoConfig
    from port_ocean.context.ocean import ocean
    import json
    from pathlib import Path
    from gcp_core.helpers.ratelimiter.overrides import (
        SearchAllResourcesQpmPerProject,
        PubSubAdministratorPerMinutePerProject,
        ProjectGetRequestsPerMinutePerProject,
        ProjectSearchRequestsPerMinutePerProject,
    )
    
    search_all_resources_qpm_per_project = SearchAllResourcesQpmPerProject()
    pubsub_administrator_per_minute_per_project = PubSubAdministratorPerMinutePerProject()
    project_get_requests_per_minute_per_project = ProjectGetRequestsPerMinutePerProject()
    project_search_requests_per_minute_per_project = (
        ProjectSearchRequestsPerMinutePerProject()
    )
    
    EXTRA_PROJECT_FIELD = "__project"
    DEFAULT_CREDENTIALS_FILE_PATH = (
        f"{Path.home()}/.config/gcloud/application_default_credentials.json"
    )
    
    if typing.TYPE_CHECKING:
        from aiolimiter import AsyncLimiter
        from asyncio import BoundedSemaphore
    
    
    class VersionedResource(TypedDict):
        version: int
        resource: dict[Any, Any]
    
    
    class AssetData(TypedDict):
        versioned_resources: list[VersionedResource]
    
    
    def parse_latest_resource_from_asset(asset_data: AssetData) -> dict[Any, Any]:
        """
        Parse the latest version of a resource from asset data.
    
        Attempts to find the versioned resources using either snake_case or camelCase key,
        as the input format depends on how the asset data was originally serialized.
    
        Args:
            asset_data: Asset data containing versioned resources
    
        Returns:
            dict: The most recent version of the resource
    
        Raises:
            ResourceNotFoundError: If neither versioned_resources nor versionedResources is found
        """
        # Try both key formats since we don't control the input format
        versioned_resources = asset_data.get("versioned_resources") or asset_data.get(
            "versionedResources"
        )
        if not isinstance(versioned_resources, list):
            raise ResourceNotFoundError(
                "Could not find versioned resources under either 'versioned_resources' or 'versionedResources'. "
                "Please ensure the asset data contains a list of versioned resources in the expected format."
            )
    
        # Ensure each item in the list is a VersionedResource
        versioned_resources = typing.cast(list[VersionedResource], versioned_resources)
    
        max_versioned_resource_data = max(versioned_resources, key=lambda x: x["version"])
        return max_versioned_resource_data["resource"]
    
    
    def should_use_snake_case() -> bool:
        """
        Determines whether to use snake_case for field names based on preserve_api_response_case_style config.
    
        Returns:
            bool: True to use snake_case, False to preserve API's original case style
        """
    
        selector = get_current_resource_config().selector
        preserve_api_case = getattr(selector, "preserve_api_response_case_style", False)
        return not preserve_api_case
    
    
    def parse_protobuf_message(
        message: proto.Message,
        config: Optional[ProtoConfig] = None,
    ) -> dict[str, Any]:
        """
        Parse protobuf message to dict, controlling field name case style.
        """
        if config and config.preserving_proto_field_name is not None:
            use_snake_case = not config.preserving_proto_field_name
            return proto.Message.to_dict(
                message, preserving_proto_field_name=use_snake_case
            )
        use_snake_case = should_use_snake_case()
        return proto.Message.to_dict(message, preserving_proto_field_name=use_snake_case)
    
    
    def parse_protobuf_messages(
        messages: MutableSequence[proto.Message],
    ) -> list[dict[str, Any]]:
        return [parse_protobuf_message(message) for message in messages]
    
    
    class AssetTypesWithSpecialHandling(enum.StrEnum):
        TOPIC = "pubsub.googleapis.com/Topic"
        SUBSCRIPTION = "pubsub.googleapis.com/Subscription"
        PROJECT = "cloudresourcemanager.googleapis.com/Project"
        ORGANIZATION = "cloudresourcemanager.googleapis.com/Organization"
        FOLDER = "cloudresourcemanager.googleapis.com/Folder"
        CLOUD_RESOURCE = "cloudResource"
    
    
    def get_current_resource_config() -> (
        typing.Union[ResourceConfig, GCPCloudResourceConfig]
    ):
        """
        Returns the current resource config, accessible only inside an event context
        """
        return typing.cast(
            typing.Union[ResourceConfig, GCPCloudResourceConfig], event.resource_config
        )
    
    
    def get_credentials_json() -> str:
        credentials_json = ""
        if ocean.integration_config.get("encoded_adc_configuration"):
            b64_credentials = ocean.integration_config["encoded_adc_configuration"]
            credentials_json = base64.b64decode(b64_credentials).decode("utf-8")
        else:
            try:
                file_path: str = (
                    os.environ.get("GOOGLE_APPLICATION_CREDENTIALS")
                    or DEFAULT_CREDENTIALS_FILE_PATH
                )
                with open(file_path, "r", encoding="utf-8") as file:
                    credentials_json = file.read()
            except FileNotFoundError as e:
                raise FileNotFoundError(
                    f"Couldn't find the google credentials file. Please set the GOOGLE_APPLICATION_CREDENTIALS environment variable properly. Error: {str(e)}"
                )
        return credentials_json
    
    
    def get_service_account_project_id() -> str:
        "get project id associated with service account"
        try:
            default_credentials = json.loads(get_credentials_json())
            project_id = default_credentials.get("project_id") or default_credentials.get(
                "quota_project_id"
            )
    
            if not project_id:
                raise KeyError("project_id or quota_project_id")
    
            return project_id
        except FileNotFoundError as e:
            gcp_project_env = os.getenv("GCP_PROJECT")
            if isinstance(gcp_project_env, str):
                return gcp_project_env
            else:
                raise ValueError(
                    f"Couldn't figure out the service account's project id. You can specify it using the GCP_PROJECT environment variable. Error: {str(e)}"
                )
        except KeyError as e:
            raise ValueError(
                f"Couldn't figure out the service account's project id. Key: {str(e)} doesn't exist in the credentials file."
            )
        except Exception as e:
            raise ValueError(
                f"Couldn't figure out the service account's project id. Error: {str(e)}"
            )
        raise ValueError("Couldn't figure out the service account's project id.")
    
    
    async def get_quotas_for_project(
        project_id: str, kind: str, quota_id: Optional[str] = None
    ) -> Tuple["AsyncLimiter", "BoundedSemaphore"]:
        try:
            match kind:
                case AssetTypesWithSpecialHandling.PROJECT:
                    if quota_id == "ProjectV3SearchRequestsPerMinutePerProject":
                        project_rate_limiter = (
                            await project_search_requests_per_minute_per_project.limiter(
                                project_id
                            )
                        )
                        project_semaphore = (
                            await project_search_requests_per_minute_per_project.semaphore(
                                project_id
                            )
                        )
                        return project_rate_limiter, project_semaphore
                    else:
                        project_rate_limiter = (
                            await project_get_requests_per_minute_per_project.limiter(
                                project_id
                            )
                        )
                        project_semaphore = (
                            await project_get_requests_per_minute_per_project.semaphore(
                                project_id
                            )
                        )
                        return project_rate_limiter, project_semaphore
                case (
                    AssetTypesWithSpecialHandling.TOPIC
                    | AssetTypesWithSpecialHandling.SUBSCRIPTION
                ):
                    topic_rate_limiter = (
                        await pubsub_administrator_per_minute_per_project.limiter(
                            project_id
                        )
                    )
                    topic_semaphore = (
                        await pubsub_administrator_per_minute_per_project.semaphore(
                            project_id
                        )
                    )
                    return (topic_rate_limiter, topic_semaphore)
                case _:
                    asset_rate_limiter = await search_all_resources_qpm_per_project.limiter(
                        project_id
                    )
                    asset_semaphore = await search_all_resources_qpm_per_project.semaphore(
                        project_id
                    )
                    return (asset_rate_limiter, asset_semaphore)
        except Exception as e:
            logger.warning(
                f"Failed to compute quota dynamically due to error. Will use default values. Error: {str(e)}"
            )
            default_rate_limiter = (
                await search_all_resources_qpm_per_project.default_rate_limiter()
            )
            default_semaphore = (
                await search_all_resources_qpm_per_project.default_semaphore()
            )
            return (default_rate_limiter, default_semaphore)
    
    
    async def resolve_request_controllers(
>       kind: str, **kwargs: Dict[str, Any]
    ) -> Tuple["AsyncLimiter", "BoundedSemaphore"]:
E   NameError: name 'Dict' is not defined

gcp_core/utils.py:256: NameError
from unittest.mock import AsyncMock, patch, MagicMock
import pytest
from port_ocean.context.event import event_context
Expand Down Expand Up @@ -39,7 +39,7 @@
@patch("google.pubsub_v1.services.subscriber.SubscriberAsyncClient", new=AsyncMock)
async def test_list_all_subscriptions_per_project(integration_config_mock: Any) -> None:
# Arrange
from gcp_core.search.resource_searches import list_all_subscriptions_per_project

Check failure on line 42 in integrations/gcp/tests/gcp_core/search/test_resource_searches.py

View workflow job for this annotation

GitHub Actions / JUnit Test Report

test_resource_searches.test_list_all_subscriptions_per_project

NameError: name 'Dict' is not defined
Raw output
integration_config_mock = <MagicMock name='integration_config' id='140297925315088'>

    @pytest.mark.asyncio
    @patch("gcp_core.search.paginated_query.paginated_query", new=mock_subscription_pages)
    @patch("google.pubsub_v1.services.subscriber.SubscriberAsyncClient", new=AsyncMock)
    async def test_list_all_subscriptions_per_project(integration_config_mock: Any) -> None:
        # Arrange
>       from gcp_core.search.resource_searches import list_all_subscriptions_per_project

tests/gcp_core/search/test_resource_searches.py:42: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
gcp_core/search/resource_searches.py:17: in <module>
    from gcp_core.utils import resolve_request_controllers
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

    import enum
    import base64
    import os
    import typing
    from collections.abc import MutableSequence
    from typing import Any, TypedDict, Tuple, Optional
    from gcp_core.errors import ResourceNotFoundError
    from loguru import logger
    import proto  # type: ignore
    from port_ocean.context.event import event
    from port_ocean.core.handlers.port_app_config.models import ResourceConfig
    
    from gcp_core.overrides import GCPCloudResourceConfig, ProtoConfig
    from port_ocean.context.ocean import ocean
    import json
    from pathlib import Path
    from gcp_core.helpers.ratelimiter.overrides import (
        SearchAllResourcesQpmPerProject,
        PubSubAdministratorPerMinutePerProject,
        ProjectGetRequestsPerMinutePerProject,
        ProjectSearchRequestsPerMinutePerProject,
    )
    
    search_all_resources_qpm_per_project = SearchAllResourcesQpmPerProject()
    pubsub_administrator_per_minute_per_project = PubSubAdministratorPerMinutePerProject()
    project_get_requests_per_minute_per_project = ProjectGetRequestsPerMinutePerProject()
    project_search_requests_per_minute_per_project = (
        ProjectSearchRequestsPerMinutePerProject()
    )
    
    EXTRA_PROJECT_FIELD = "__project"
    DEFAULT_CREDENTIALS_FILE_PATH = (
        f"{Path.home()}/.config/gcloud/application_default_credentials.json"
    )
    
    if typing.TYPE_CHECKING:
        from aiolimiter import AsyncLimiter
        from asyncio import BoundedSemaphore
    
    
    class VersionedResource(TypedDict):
        version: int
        resource: dict[Any, Any]
    
    
    class AssetData(TypedDict):
        versioned_resources: list[VersionedResource]
    
    
    def parse_latest_resource_from_asset(asset_data: AssetData) -> dict[Any, Any]:
        """
        Parse the latest version of a resource from asset data.
    
        Attempts to find the versioned resources using either snake_case or camelCase key,
        as the input format depends on how the asset data was originally serialized.
    
        Args:
            asset_data: Asset data containing versioned resources
    
        Returns:
            dict: The most recent version of the resource
    
        Raises:
            ResourceNotFoundError: If neither versioned_resources nor versionedResources is found
        """
        # Try both key formats since we don't control the input format
        versioned_resources = asset_data.get("versioned_resources") or asset_data.get(
            "versionedResources"
        )
        if not isinstance(versioned_resources, list):
            raise ResourceNotFoundError(
                "Could not find versioned resources under either 'versioned_resources' or 'versionedResources'. "
                "Please ensure the asset data contains a list of versioned resources in the expected format."
            )
    
        # Ensure each item in the list is a VersionedResource
        versioned_resources = typing.cast(list[VersionedResource], versioned_resources)
    
        max_versioned_resource_data = max(versioned_resources, key=lambda x: x["version"])
        return max_versioned_resource_data["resource"]
    
    
    def should_use_snake_case() -> bool:
        """
        Determines whether to use snake_case for field names based on preserve_api_response_case_style config.
    
        Returns:
            bool: True to use snake_case, False to preserve API's original case style
        """
    
        selector = get_current_resource_config().selector
        preserve_api_case = getattr(selector, "preserve_api_response_case_style", False)
        return not preserve_api_case
    
    
    def parse_protobuf_message(
        message: proto.Message,
        config: Optional[ProtoConfig] = None,
    ) -> dict[str, Any]:
        """
        Parse protobuf message to dict, controlling field name case style.
        """
        if config and config.preserving_proto_field_name is not None:
            use_snake_case = not config.preserving_proto_field_name
            return proto.Message.to_dict(
                message, preserving_proto_field_name=use_snake_case
            )
        use_snake_case = should_use_snake_case()
        return proto.Message.to_dict(message, preserving_proto_field_name=use_snake_case)
    
    
    def parse_protobuf_messages(
        messages: MutableSequence[proto.Message],
    ) -> list[dict[str, Any]]:
        return [parse_protobuf_message(message) for message in messages]
    
    
    class AssetTypesWithSpecialHandling(enum.StrEnum):
        TOPIC = "pubsub.googleapis.com/Topic"
        SUBSCRIPTION = "pubsub.googleapis.com/Subscription"
        PROJECT = "cloudresourcemanager.googleapis.com/Project"
        ORGANIZATION = "cloudresourcemanager.googleapis.com/Organization"
        FOLDER = "cloudresourcemanager.googleapis.com/Folder"
        CLOUD_RESOURCE = "cloudResource"
    
    
    def get_current_resource_config() -> (
        typing.Union[ResourceConfig, GCPCloudResourceConfig]
    ):
        """
        Returns the current resource config, accessible only inside an event context
        """
        return typing.cast(
            typing.Union[ResourceConfig, GCPCloudResourceConfig], event.resource_config
        )
    
    
    def get_credentials_json() -> str:
        credentials_json = ""
        if ocean.integration_config.get("encoded_adc_configuration"):
            b64_credentials = ocean.integration_config["encoded_adc_configuration"]
            credentials_json = base64.b64decode(b64_credentials).decode("utf-8")
        else:
            try:
                file_path: str = (
                    os.environ.get("GOOGLE_APPLICATION_CREDENTIALS")
                    or DEFAULT_CREDENTIALS_FILE_PATH
                )
                with open(file_path, "r", encoding="utf-8") as file:
                    credentials_json = file.read()
            except FileNotFoundError as e:
                raise FileNotFoundError(
                    f"Couldn't find the google credentials file. Please set the GOOGLE_APPLICATION_CREDENTIALS environment variable properly. Error: {str(e)}"
                )
        return credentials_json
    
    
    def get_service_account_project_id() -> str:
        "get project id associated with service account"
        try:
            default_credentials = json.loads(get_credentials_json())
            project_id = default_credentials.get("project_id") or default_credentials.get(
                "quota_project_id"
            )
    
            if not project_id:
                raise KeyError("project_id or quota_project_id")
    
            return project_id
        except FileNotFoundError as e:
            gcp_project_env = os.getenv("GCP_PROJECT")
            if isinstance(gcp_project_env, str):
                return gcp_project_env
            else:
                raise ValueError(
                    f"Couldn't figure out the service account's project id. You can specify it using the GCP_PROJECT environment variable. Error: {str(e)}"
                )
        except KeyError as e:
            raise ValueError(
                f"Couldn't figure out the service account's project id. Key: {str(e)} doesn't exist in the credentials file."
            )
        except Exception as e:
            raise ValueError(
                f"Couldn't figure out the service account's project id. Error: {str(e)}"
            )
        raise ValueError("Couldn't figure out the service account's project id.")
    
    
    async def get_quotas_for_project(
        project_id: str, kind: str, quota_id: Optional[str] = None
    ) -> Tuple["AsyncLimiter", "BoundedSemaphore"]:
        try:
            match kind:
                case AssetTypesWithSpecialHandling.PROJECT:
                    if quota_id == "ProjectV3SearchRequestsPerMinutePerProject":
                        project_rate_limiter = (
                            await project_search_requests_per_minute_per_project.limiter(
                                project_id
                            )
                        )
                        project_semaphore = (
                            await project_search_requests_per_minute_per_project.semaphore(
                                project_id
                            )
                        )
                        return project_rate_limiter, project_semaphore
                    else:
                        project_rate_limiter = (
                            await project_get_requests_per_minute_per_project.limiter(
                                project_id
                            )
                        )
                        project_semaphore = (
                            await project_get_requests_per_minute_per_project.semaphore(
                                project_id
                            )
                        )
                        return project_rate_limiter, project_semaphore
                case (
                    AssetTypesWithSpecialHandling.TOPIC
                    | AssetTypesWithSpecialHandling.SUBSCRIPTION
                ):
                    topic_rate_limiter = (
                        await pubsub_administrator_per_minute_per_project.limiter(
                            project_id
                        )
                    )
                    topic_semaphore = (
                        await pubsub_administrator_per_minute_per_project.semaphore(
                            project_id
                        )
                    )
                    return (topic_rate_limiter, topic_semaphore)
                case _:
                    asset_rate_limiter = await search_all_resources_qpm_per_project.limiter(
                        project_id
                    )
                    asset_semaphore = await search_all_resources_qpm_per_project.semaphore(
                        project_id
                    )
                    return (asset_rate_limiter, asset_semaphore)
        except Exception as e:
            logger.warning(
                f"Failed to compute quota dynamically due to error. Will use default values. Error: {str(e)}"
            )
            default_rate_limiter = (
                await search_all_resources_qpm_per_project.default_rate_limiter()
            )
            default_semaphore = (
                await search_all_resources_qpm_per_project.default_semaphore()
            )
            return (default_rate_limiter, default_semaphore)
    
    
    async def resolve_request_controllers(
>       kind: str, **kwargs: Dict[str, Any]
    ) -> Tuple["AsyncLimiter", "BoundedSemaphore"]:
E   NameError: name 'Dict' is not defined

gcp_core/utils.py:256: NameError

expected_subscriptions = [
{"__project": {"name": "project_name"}, "name": "subscription_1"},
Expand Down Expand Up @@ -104,9 +104,12 @@


@pytest.mark.asyncio
@patch("gcp_core.utils.resolve_request_controllers")
@patch("gcp_core.utils.get_current_resource_config")
async def test_feed_to_resource(
get_current_resource_config_mock: MagicMock, monkeypatch: Any
get_current_resource_config_mock: MagicMock,
resolve_request_controllers_mock: AsyncMock,
monkeypatch: Any,
) -> None:
# Arrange
projects_async_client_mock = AsyncMock
Expand Down Expand Up @@ -136,6 +139,10 @@

from gcp_core.search.resource_searches import feed_event_to_resource

# Mock resolve_request_controllers
mock_rate_limiter = AsyncMock()
resolve_request_controllers_mock.return_value = (mock_rate_limiter, None)

mock_asset_name = "projects/project_name/topics/topic_name"
mock_asset_type = "pubsub.googleapis.com/Topic"
mock_asset_project_name = "project_name"
Expand Down
Loading