Skip to content

Commit

Permalink
[Integration][GItlab] Revert changes made in attempt to resolve race …
Browse files Browse the repository at this point in the history
…conditions (#1343)

# Description

What - Revert changes made to the integration in 0.2.1

Why - 0.2.1 made significant changes to real time events handling..
I'm releasing this in attempt to role out chances that the issue of
merge request events not syncing was sparked by 0.2.1 and if it actually
is, then we'd have to trace the background of the problem 0.2.1 sought
to solve and re-solve.

How - Restore state of real time events prior to 0.2.1

## Type of change

Please leave one option from the following and delete the rest:

- [ ] Bug fix (non-breaking change which fixes an issue)
- [ ] New feature (non-breaking change which adds functionality)
- [ ] New Integration (non-breaking change which adds a new integration)
- [ ] Breaking change (fix or feature that would cause existing
functionality to not work as expected)
- [ ] Non-breaking change (fix of existing functionality that will not
change current behavior)
- [ ] Documentation (added/updated documentation)

<h4> All tests should be run against the port production
environment(using a testing org). </h4>

### Core testing checklist

- [ ] Integration able to create all default resources from scratch
- [ ] Resync finishes successfully
- [ ] Resync able to create entities
- [ ] Resync able to update entities
- [ ] Resync able to detect and delete entities
- [ ] Scheduled resync able to abort existing resync and start a new one
- [ ] Tested with at least 2 integrations from scratch
- [ ] Tested with Kafka and Polling event listeners
- [ ] Tested deletion of entities that don't pass the selector


### Integration testing checklist

- [ ] Integration able to create all default resources from scratch
- [ ] Resync able to create entities
- [ ] Resync able to update entities
- [ ] Resync able to detect and delete entities
- [ ] Resync finishes successfully
- [ ] If new resource kind is added or updated in the integration, add
example raw data, mapping and expected result to the `examples` folder
in the integration directory.
- [ ] If resource kind is updated, run the integration with the example
data and check if the expected result is achieved
- [ ] If new resource kind is added or updated, validate that
live-events for that resource are working as expected
- [ ] Docs PR link [here](#)

### Preflight checklist

- [ ] Handled rate limiting
- [ ] Handled pagination
- [ ] Implemented the code in async
- [ ] Support Multi account

## Screenshots

Include screenshots from your environment showing how the resources of
the integration will look.

## API Documentation

Provide links to the API documentation used for this integration.

---------

Co-authored-by: tankilevitch <[email protected]>
  • Loading branch information
mk-armah and Tankilevitch authored Jan 27, 2025
1 parent fd553e0 commit dbff904
Show file tree
Hide file tree
Showing 4 changed files with 75 additions and 83 deletions.
8 changes: 8 additions & 0 deletions integrations/gitlab/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,14 @@ this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm

<!-- towncrier release notes start -->

0.2.28 (2025-01-27)
===================

### Bug Fixes

- Revert changes made in 0.2.1


0.2.27 (2025-01-23)
===================

Expand Down
132 changes: 55 additions & 77 deletions integrations/gitlab/gitlab_integration/events/event_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
event_context,
EventContext,
)
import time

Observer = Callable[[str, dict[str, Any]], Awaitable[Any]]

Expand All @@ -29,53 +28,51 @@ def __init__(self) -> None:
async def _start_event_processor(self) -> None:
logger.info(f"Started {self.__class__.__name__} worker")
while True:
event_ctx, event_id, body = await self.webhook_tasks_queue.get()
logger.debug(
f"Retrieved event: {event_id} from Queue, notifying observers",
queue_size=self.webhook_tasks_queue.qsize(),
)
try:
async with event_context(
"gitlab_http_event_async_worker", parent_override=event_ctx
):
await self._notify(event_id, body)
except Exception as e:
logger.error(
f"Error notifying observers for event: {event_id}, error: {e}"
)
finally:
logger.info(
f"Processed event {event_id}",
event_id=event_id,
event_context=event_ctx.id,
event_ctx, event, body = await self.webhook_tasks_queue.get()
with logger.contextualize(
event_context=event_ctx.id,
event_type=event_ctx.event_type,
event_id=event_ctx.id,
event=event,
):
logger.debug(
f"Retrieved event: {event} from Queue, notifying observers",
queue_size=self.webhook_tasks_queue.qsize(),
)
self.webhook_tasks_queue.task_done()
try:
async with event_context(
"gitlab_http_event_async_worker", parent_override=event_ctx
):
await self._notify(event, body)
except Exception as e:
logger.error(
f"Error notifying observers for event: {event}, error: {e}"
)
finally:
logger.info(
f"Processed event {event}",
)
self.webhook_tasks_queue.task_done()

async def start_event_processor(self) -> None:
asyncio.create_task(self._start_event_processor())

@abstractmethod
async def _notify(self, event_id: str, body: dict[str, Any]) -> None:
async def _notify(self, event: str, body: dict[str, Any]) -> None:
pass

async def notify(self, event_id: str, body: dict[str, Any]) -> None:
logger.debug(
f"Received event: {event_id}, putting it in Queue for processing",
event_context=current_event_context.id,
)
async def notify(self, event: str, body: dict[str, Any]) -> None:
logger.debug(f"Received event: {event}, putting it in Queue for processing")
await self.webhook_tasks_queue.put(
(
deepcopy(current_event_context),
event_id,
event,
body,
)
)


class EventHandler(BaseEventHandler):
MAXIMUM_RETRIES = 3
TIMEOUT = 90

def __init__(self) -> None:
super().__init__()
self._observers: dict[str, list[Observer]] = defaultdict(list)
Expand All @@ -84,46 +81,24 @@ def on(self, events: list[str], observer: Observer) -> None:
for event in events:
self._observers[event].append(observer)

async def _notify(self, event_id: str, body: dict[str, Any]) -> None:
observers_list = self._observers.get(event_id, [])
async def _notify(self, event: str, body: dict[str, Any]) -> None:
observers_list = self._observers.get(event, [])

if not observers_list:
logger.info(
f"event: {event_id} has no matching handler. the handlers available are for events: {self._observers.keys()}"
f"event: {event} has no matching handler. the handlers available are for events: {self._observers.keys()}"
)
return
for observer in observers_list:
retries_left = self.MAXIMUM_RETRIES
observer_time = time.time()
while retries_left > 0:
try:
if asyncio.iscoroutinefunction(observer):
if inspect.ismethod(observer):
handler = observer.__self__.__class__.__name__
logger.debug(
f"Notifying observer: {handler}, for event: {event_id} at {observer_time}",
event_id=event_id,
handler=handler,
)
await asyncio.wait_for(
observer(event_id, body), self.TIMEOUT
) # Sequentially call each observer
logger.debug(
f"Observer {handler} completed work at {time.time() - observer_time}",
event_id=event_id,
handler=handler,
)
break
except asyncio.TimeoutError:
logger.error(
f"{handler} started work at {observer_time}, did not complete handling event {event_id} within {self.TIMEOUT} seconds, retrying"
)
retries_left -= 1
except Exception as e:
logger.error(
f"Error processing event {event_id} with observer {observer}: {e}",
exc_info=True,
if asyncio.iscoroutinefunction(observer):
if inspect.ismethod(observer):
handler = observer.__self__.__class__.__name__
logger.debug(
f"Notifying observer: {handler}, for event: {event}",
event=event,
handler=handler,
)
break
asyncio.create_task(observer(event, deepcopy(body))) # type: ignore


class SystemEventHandler(BaseEventHandler):
Expand All @@ -139,17 +114,20 @@ def on(self, hook_handler: Type[HookHandler]) -> None:
def add_client(self, client: GitlabService) -> None:
self._clients.append(client)

async def _notify(self, event_id: str, body: dict[str, Any]) -> None:
async def _notify(self, event: str, body: dict[str, Any]) -> None:
# best effort to notify using all clients, as we don't know which one of the clients have the permission to
# access the project
for client in self._clients:
for hook_handler_class in self._hook_handlers.get(event_id, []):
try:
hook_handler_instance = hook_handler_class(client)
await hook_handler_instance.on_hook(
event_id, body
) # Sequentially process handlers
except Exception as e:
logger.error(
f"Error processing event {event_id} with handler {hook_handler_class.__name__} for client {client}: {str(e)}"
)
results = await asyncio.gather(
*(
hook_handler(client).on_hook(event, deepcopy(body))
for client in self._clients
for hook_handler in self._hook_handlers.get(event, [])
),
return_exceptions=True,
)

for result in results:
if isinstance(result, Exception):
logger.error(
f"Failed to notify observer for event: {event}, error: {result}"
)
16 changes: 11 additions & 5 deletions integrations/gitlab/gitlab_integration/gitlab_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,9 @@ async def _get_entities_from_git(
project.files.get, file_path, sha
)

entities = yaml.safe_load(file_content.decode())
entities = await anyio.to_thread.run_sync(
yaml.safe_load, file_content.decode()
)
raw_entities = [
Entity(**entity_data)
for entity_data in (
Expand Down Expand Up @@ -818,7 +820,7 @@ async def get_entities_diff(

return entities_before, entities_after

def _parse_file_content(
async def _parse_file_content(
self, project: Project, file: ProjectFile
) -> Union[str, dict[str, Any], list[Any]] | None:
"""
Expand All @@ -833,13 +835,17 @@ def _parse_file_content(
)
return None
try:
return json.loads(file.decode())
return await anyio.to_thread.run_sync(json.loads, file.decode())
except json.JSONDecodeError:
try:
logger.debug(
f"Trying to process file {file.file_path} in project {project.path_with_namespace} as YAML"
)
documents = list(yaml.load_all(file.decode(), Loader=yaml.SafeLoader))
documents = list(
await anyio.to_thread.run_sync(
yaml.load_all, file.decode(), yaml.SafeLoader
)
)
if not documents:
logger.debug(
f"Failed to parse file {file.file_path} in project {project.path_with_namespace} as YAML,"
Expand Down Expand Up @@ -868,7 +874,7 @@ async def get_and_parse_single_file(
f"Fetched file {file_path} in project {project.path_with_namespace}"
)
project_file = typing.cast(ProjectFile, project_file)
parsed_file = self._parse_file_content(project, project_file)
parsed_file = await self._parse_file_content(project, project_file)
project_file_dict = project_file.asdict()

if not parsed_file:
Expand Down
2 changes: 1 addition & 1 deletion integrations/gitlab/pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "gitlab"
version = "0.2.27"
version = "0.2.28"
description = "Gitlab integration for Port using Port-Ocean Framework"
authors = ["Yair Siman-Tov <[email protected]>"]

Expand Down

0 comments on commit dbff904

Please sign in to comment.