From 99db061943260a8186353b3852189a76a947eb96 Mon Sep 17 00:00:00 2001 From: yair Date: Wed, 16 Oct 2024 10:09:24 +0300 Subject: [PATCH] revert ocean --- integrations/jira/jira/client.py | 77 ++++++++++++++++++-------------- port_ocean/utils/repeat.py | 44 +++++++++--------- 2 files changed, 63 insertions(+), 58 deletions(-) diff --git a/integrations/jira/jira/client.py b/integrations/jira/jira/client.py index 850b3afaad..1fe111f9b9 100644 --- a/integrations/jira/jira/client.py +++ b/integrations/jira/jira/client.py @@ -1,13 +1,13 @@ import typing from typing import Any, AsyncGenerator +import aiohttp from aiohttp import ClientTimeout, BasicAuth from loguru import logger from jira.overrides import JiraResourceConfig from port_ocean.context.event import event from port_ocean.context.ocean import ocean -from port_ocean.utils import http_async_client PAGE_SIZE = 50 WEBHOOK_NAME = "Port-Ocean-Events-Webhook" @@ -38,13 +38,12 @@ def __init__(self, jira_url: str, jira_email: str, jira_token: str) -> None: self.api_url = f"{self.jira_rest_url}/api/3" self.webhooks_url = f"{self.jira_rest_url}/webhooks/1.0/webhook" - self.client = http_async_client - self.client._default_auth = self.jira_api_auth - self.client._timeout = ClientTimeout(30) + def _create_session(self): + return aiohttp.ClientSession(auth=self.jira_api_auth, timeout=ClientTimeout(30)) @staticmethod def _generate_base_req_params( - maxResults: int = 0, startAt: int = 0 + maxResults: int = 0, startAt: int = 0 ) -> dict[str, Any]: return { "maxResults": maxResults, @@ -52,27 +51,33 @@ def _generate_base_req_params( } async def _get_paginated_projects(self, params: dict[str, Any]) -> dict[str, Any]: - project_response = await self.client.get( - f"{self.api_url}/project/search", params=params - ) - project_response.raise_for_status() - return await project_response.json() + async with self._create_session() as session: + async with session.get( + f"{self.api_url}/project/search", params=params + ) as project_response: + await session.close() + project_response.raise_for_status() + return await project_response.json() async def _get_paginated_issues(self, params: dict[str, Any]) -> dict[str, Any]: - issue_response = await self.client.get(f"{self.api_url}/search", params=params) - issue_response.raise_for_status() - return await issue_response.json() + async with self._create_session() as session: + async with session.get(f"{self.api_url}/search", params=params) as issue_response: + await session.close() + issue_response.raise_for_status() + return await issue_response.json() async def create_events_webhook(self, app_host: str) -> None: webhook_target_app_host = f"{app_host}/integration/webhook" - webhook_check_response = await self.client.get(f"{self.webhooks_url}") - webhook_check_response.raise_for_status() - webhook_check = await webhook_check_response.json() + async with self._create_session() as session: + async with session.get(f"{self.webhooks_url}") as webhook_check_response: + await session.close() + webhook_check_response.raise_for_status() + webhook_check = await webhook_check_response.json() - for webhook in webhook_check: - if webhook["url"] == webhook_target_app_host: - logger.info("Ocean real time reporting webhook already exists") - return + for webhook in webhook_check: + if webhook["url"] == webhook_target_app_host: + logger.info("Ocean real time reporting webhook already exists") + return body = { "name": f"{ocean.config.integration.identifier}-{WEBHOOK_NAME}", @@ -80,21 +85,23 @@ async def create_events_webhook(self, app_host: str) -> None: "events": WEBHOOK_EVENTS, } - webhook_create_response = await self.client.post( - f"{self.webhooks_url}", json=body - ) - webhook_create_response.raise_for_status() - logger.info("Ocean real time reporting webhook created") + async with self._create_session() as session: + async with session.post(f"{self.webhooks_url}", json=body) as webhook_create_response: + await session.close() + webhook_create_response.raise_for_status() + logger.info("Ocean real time reporting webhook created") async def get_single_project(self, project_key: str) -> dict[str, Any]: - project_response = await self.client.get( - f"{self.api_url}/project/{project_key}" - ) - project_response.raise_for_status() - return await project_response.json() + async with self._create_session() as session: + async with session.get( + f"{self.api_url}/project/{project_key}" + ) as project_response: + await session.close() + project_response.raise_for_status() + return await project_response.json() async def get_paginated_projects( - self, + self, ) -> AsyncGenerator[list[dict[str, Any]], None]: logger.info("Getting projects from Jira") @@ -117,9 +124,11 @@ async def get_paginated_projects( params["startAt"] += PAGE_SIZE async def get_single_issue(self, issue_key: str) -> dict[str, Any]: - issue_response = await self.client.get(f"{self.api_url}/issue/{issue_key}") - issue_response.raise_for_status() - return await issue_response.json() + async with self._create_session() as session: + async with session.get(f"{self.api_url}/issue/{issue_key}") as issue_response: + await session.close() + issue_response.raise_for_status() + return await issue_response.json() async def get_paginated_issues(self) -> AsyncGenerator[list[dict[str, Any]], None]: logger.info("Getting issues from Jira") diff --git a/port_ocean/utils/repeat.py b/port_ocean/utils/repeat.py index d762650299..dcbddc8e39 100644 --- a/port_ocean/utils/repeat.py +++ b/port_ocean/utils/repeat.py @@ -5,6 +5,7 @@ from typing import Callable, Coroutine, Any from loguru import logger +from starlette.concurrency import run_in_threadpool from port_ocean.utils.signal import signal_handler @@ -15,10 +16,6 @@ ] -def run_in_threadpool(param): - pass - - def repeat_every( seconds: float, wait_first: bool = False, @@ -61,27 +58,26 @@ async def wrapped() -> None: async def loop() -> None: nonlocal repetitions - # if wait_first: - # await asyncio.sleep(seconds) - # count the repetition even if an exception is raised - repetitions += 1 - try: - if is_coroutine: - task = asyncio.create_task(func()) - signal_handler.register(lambda: task.cancel()) - ensure_future(task) - else: - run_in_threadpool(func) - except Exception as exc: - formatted_exception = "".join( - format_exception(type(exc), exc, exc.__traceback__) - ) - logger.error(formatted_exception) - if raise_exceptions: - raise exc + if wait_first: await asyncio.sleep(seconds) - if max_repetitions is None or repetitions < max_repetitions: - asyncio.get_event_loop().call_later(seconds, loop) + while max_repetitions is None or repetitions < max_repetitions: + # count the repetition even if an exception is raised + repetitions += 1 + try: + if is_coroutine: + task = asyncio.create_task(func()) + signal_handler.register(lambda: task.cancel()) + ensure_future(task) + else: + await run_in_threadpool(func) + except Exception as exc: + formatted_exception = "".join( + format_exception(type(exc), exc, exc.__traceback__) + ) + logger.error(formatted_exception) + if raise_exceptions: + raise exc + await asyncio.sleep(seconds) ensure_future(loop())