diff --git a/integrations/gitlab/CHANGELOG.md b/integrations/gitlab/CHANGELOG.md index a319ae92ec..df72180b86 100644 --- a/integrations/gitlab/CHANGELOG.md +++ b/integrations/gitlab/CHANGELOG.md @@ -7,6 +7,14 @@ this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm +0.2.28 (2025-01-27) +=================== + +### Bug Fixes + +- Revert changes made in 0.2.1 + + 0.2.27 (2025-01-23) =================== diff --git a/integrations/gitlab/gitlab_integration/events/event_handler.py b/integrations/gitlab/gitlab_integration/events/event_handler.py index f380143999..004e86740a 100644 --- a/integrations/gitlab/gitlab_integration/events/event_handler.py +++ b/integrations/gitlab/gitlab_integration/events/event_handler.py @@ -15,7 +15,6 @@ event_context, EventContext, ) -import time Observer = Callable[[str, dict[str, Any]], Awaitable[Any]] @@ -29,53 +28,51 @@ def __init__(self) -> None: async def _start_event_processor(self) -> None: logger.info(f"Started {self.__class__.__name__} worker") while True: - event_ctx, event_id, body = await self.webhook_tasks_queue.get() - logger.debug( - f"Retrieved event: {event_id} from Queue, notifying observers", - queue_size=self.webhook_tasks_queue.qsize(), - ) - try: - async with event_context( - "gitlab_http_event_async_worker", parent_override=event_ctx - ): - await self._notify(event_id, body) - except Exception as e: - logger.error( - f"Error notifying observers for event: {event_id}, error: {e}" - ) - finally: - logger.info( - f"Processed event {event_id}", - event_id=event_id, - event_context=event_ctx.id, + event_ctx, event, body = await self.webhook_tasks_queue.get() + with logger.contextualize( + event_context=event_ctx.id, + event_type=event_ctx.event_type, + event_id=event_ctx.id, + event=event, + ): + logger.debug( + f"Retrieved event: {event} from Queue, notifying observers", + queue_size=self.webhook_tasks_queue.qsize(), ) - self.webhook_tasks_queue.task_done() + try: + async with event_context( + "gitlab_http_event_async_worker", parent_override=event_ctx + ): + await self._notify(event, body) + except Exception as e: + logger.error( + f"Error notifying observers for event: {event}, error: {e}" + ) + finally: + logger.info( + f"Processed event {event}", + ) + self.webhook_tasks_queue.task_done() async def start_event_processor(self) -> None: asyncio.create_task(self._start_event_processor()) @abstractmethod - async def _notify(self, event_id: str, body: dict[str, Any]) -> None: + async def _notify(self, event: str, body: dict[str, Any]) -> None: pass - async def notify(self, event_id: str, body: dict[str, Any]) -> None: - logger.debug( - f"Received event: {event_id}, putting it in Queue for processing", - event_context=current_event_context.id, - ) + async def notify(self, event: str, body: dict[str, Any]) -> None: + logger.debug(f"Received event: {event}, putting it in Queue for processing") await self.webhook_tasks_queue.put( ( deepcopy(current_event_context), - event_id, + event, body, ) ) class EventHandler(BaseEventHandler): - MAXIMUM_RETRIES = 3 - TIMEOUT = 90 - def __init__(self) -> None: super().__init__() self._observers: dict[str, list[Observer]] = defaultdict(list) @@ -84,46 +81,24 @@ def on(self, events: list[str], observer: Observer) -> None: for event in events: self._observers[event].append(observer) - async def _notify(self, event_id: str, body: dict[str, Any]) -> None: - observers_list = self._observers.get(event_id, []) + async def _notify(self, event: str, body: dict[str, Any]) -> None: + observers_list = self._observers.get(event, []) + if not observers_list: logger.info( - f"event: {event_id} has no matching handler. the handlers available are for events: {self._observers.keys()}" + f"event: {event} has no matching handler. the handlers available are for events: {self._observers.keys()}" ) return for observer in observers_list: - retries_left = self.MAXIMUM_RETRIES - observer_time = time.time() - while retries_left > 0: - try: - if asyncio.iscoroutinefunction(observer): - if inspect.ismethod(observer): - handler = observer.__self__.__class__.__name__ - logger.debug( - f"Notifying observer: {handler}, for event: {event_id} at {observer_time}", - event_id=event_id, - handler=handler, - ) - await asyncio.wait_for( - observer(event_id, body), self.TIMEOUT - ) # Sequentially call each observer - logger.debug( - f"Observer {handler} completed work at {time.time() - observer_time}", - event_id=event_id, - handler=handler, - ) - break - except asyncio.TimeoutError: - logger.error( - f"{handler} started work at {observer_time}, did not complete handling event {event_id} within {self.TIMEOUT} seconds, retrying" - ) - retries_left -= 1 - except Exception as e: - logger.error( - f"Error processing event {event_id} with observer {observer}: {e}", - exc_info=True, + if asyncio.iscoroutinefunction(observer): + if inspect.ismethod(observer): + handler = observer.__self__.__class__.__name__ + logger.debug( + f"Notifying observer: {handler}, for event: {event}", + event=event, + handler=handler, ) - break + asyncio.create_task(observer(event, deepcopy(body))) # type: ignore class SystemEventHandler(BaseEventHandler): @@ -139,17 +114,20 @@ def on(self, hook_handler: Type[HookHandler]) -> None: def add_client(self, client: GitlabService) -> None: self._clients.append(client) - async def _notify(self, event_id: str, body: dict[str, Any]) -> None: + async def _notify(self, event: str, body: dict[str, Any]) -> None: # best effort to notify using all clients, as we don't know which one of the clients have the permission to # access the project - for client in self._clients: - for hook_handler_class in self._hook_handlers.get(event_id, []): - try: - hook_handler_instance = hook_handler_class(client) - await hook_handler_instance.on_hook( - event_id, body - ) # Sequentially process handlers - except Exception as e: - logger.error( - f"Error processing event {event_id} with handler {hook_handler_class.__name__} for client {client}: {str(e)}" - ) + results = await asyncio.gather( + *( + hook_handler(client).on_hook(event, deepcopy(body)) + for client in self._clients + for hook_handler in self._hook_handlers.get(event, []) + ), + return_exceptions=True, + ) + + for result in results: + if isinstance(result, Exception): + logger.error( + f"Failed to notify observer for event: {event}, error: {result}" + ) diff --git a/integrations/gitlab/gitlab_integration/gitlab_service.py b/integrations/gitlab/gitlab_integration/gitlab_service.py index 5847353177..e02a45c6fe 100644 --- a/integrations/gitlab/gitlab_integration/gitlab_service.py +++ b/integrations/gitlab/gitlab_integration/gitlab_service.py @@ -228,7 +228,9 @@ async def _get_entities_from_git( project.files.get, file_path, sha ) - entities = yaml.safe_load(file_content.decode()) + entities = await anyio.to_thread.run_sync( + yaml.safe_load, file_content.decode() + ) raw_entities = [ Entity(**entity_data) for entity_data in ( @@ -818,7 +820,7 @@ async def get_entities_diff( return entities_before, entities_after - def _parse_file_content( + async def _parse_file_content( self, project: Project, file: ProjectFile ) -> Union[str, dict[str, Any], list[Any]] | None: """ @@ -833,13 +835,17 @@ def _parse_file_content( ) return None try: - return json.loads(file.decode()) + return await anyio.to_thread.run_sync(json.loads, file.decode()) except json.JSONDecodeError: try: logger.debug( f"Trying to process file {file.file_path} in project {project.path_with_namespace} as YAML" ) - documents = list(yaml.load_all(file.decode(), Loader=yaml.SafeLoader)) + documents = list( + await anyio.to_thread.run_sync( + yaml.load_all, file.decode(), yaml.SafeLoader + ) + ) if not documents: logger.debug( f"Failed to parse file {file.file_path} in project {project.path_with_namespace} as YAML," @@ -868,7 +874,7 @@ async def get_and_parse_single_file( f"Fetched file {file_path} in project {project.path_with_namespace}" ) project_file = typing.cast(ProjectFile, project_file) - parsed_file = self._parse_file_content(project, project_file) + parsed_file = await self._parse_file_content(project, project_file) project_file_dict = project_file.asdict() if not parsed_file: diff --git a/integrations/gitlab/pyproject.toml b/integrations/gitlab/pyproject.toml index f7a7cdcd4e..b821235033 100644 --- a/integrations/gitlab/pyproject.toml +++ b/integrations/gitlab/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "gitlab" -version = "0.2.27" +version = "0.2.28" description = "Gitlab integration for Port using Port-Ocean Framework" authors = ["Yair Siman-Tov "]