Skip to content

Commit

Permalink
revert ocean
Browse files Browse the repository at this point in the history
  • Loading branch information
yairsimantov20 committed Oct 16, 2024
1 parent 2c0538b commit 99db061
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 58 deletions.
77 changes: 43 additions & 34 deletions integrations/jira/jira/client.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
import typing
from typing import Any, AsyncGenerator

import aiohttp
from aiohttp import ClientTimeout, BasicAuth
from loguru import logger

from jira.overrides import JiraResourceConfig
from port_ocean.context.event import event
from port_ocean.context.ocean import ocean
from port_ocean.utils import http_async_client

PAGE_SIZE = 50
WEBHOOK_NAME = "Port-Ocean-Events-Webhook"
Expand Down Expand Up @@ -38,63 +38,70 @@ def __init__(self, jira_url: str, jira_email: str, jira_token: str) -> None:
self.api_url = f"{self.jira_rest_url}/api/3"
self.webhooks_url = f"{self.jira_rest_url}/webhooks/1.0/webhook"

self.client = http_async_client
self.client._default_auth = self.jira_api_auth
self.client._timeout = ClientTimeout(30)
def _create_session(self):
return aiohttp.ClientSession(auth=self.jira_api_auth, timeout=ClientTimeout(30))

@staticmethod
def _generate_base_req_params(
maxResults: int = 0, startAt: int = 0
maxResults: int = 0, startAt: int = 0
) -> dict[str, Any]:
return {
"maxResults": maxResults,
"startAt": startAt,
}

async def _get_paginated_projects(self, params: dict[str, Any]) -> dict[str, Any]:
project_response = await self.client.get(
f"{self.api_url}/project/search", params=params
)
project_response.raise_for_status()
return await project_response.json()
async with self._create_session() as session:
async with session.get(
f"{self.api_url}/project/search", params=params
) as project_response:
await session.close()
project_response.raise_for_status()
return await project_response.json()

async def _get_paginated_issues(self, params: dict[str, Any]) -> dict[str, Any]:
issue_response = await self.client.get(f"{self.api_url}/search", params=params)
issue_response.raise_for_status()
return await issue_response.json()
async with self._create_session() as session:
async with session.get(f"{self.api_url}/search", params=params) as issue_response:
await session.close()
issue_response.raise_for_status()
return await issue_response.json()

async def create_events_webhook(self, app_host: str) -> None:
webhook_target_app_host = f"{app_host}/integration/webhook"
webhook_check_response = await self.client.get(f"{self.webhooks_url}")
webhook_check_response.raise_for_status()
webhook_check = await webhook_check_response.json()
async with self._create_session() as session:
async with session.get(f"{self.webhooks_url}") as webhook_check_response:
await session.close()
webhook_check_response.raise_for_status()
webhook_check = await webhook_check_response.json()

for webhook in webhook_check:
if webhook["url"] == webhook_target_app_host:
logger.info("Ocean real time reporting webhook already exists")
return
for webhook in webhook_check:
if webhook["url"] == webhook_target_app_host:
logger.info("Ocean real time reporting webhook already exists")
return

body = {
"name": f"{ocean.config.integration.identifier}-{WEBHOOK_NAME}",
"url": webhook_target_app_host,
"events": WEBHOOK_EVENTS,
}

webhook_create_response = await self.client.post(
f"{self.webhooks_url}", json=body
)
webhook_create_response.raise_for_status()
logger.info("Ocean real time reporting webhook created")
async with self._create_session() as session:
async with session.post(f"{self.webhooks_url}", json=body) as webhook_create_response:
await session.close()
webhook_create_response.raise_for_status()
logger.info("Ocean real time reporting webhook created")

async def get_single_project(self, project_key: str) -> dict[str, Any]:
project_response = await self.client.get(
f"{self.api_url}/project/{project_key}"
)
project_response.raise_for_status()
return await project_response.json()
async with self._create_session() as session:
async with session.get(
f"{self.api_url}/project/{project_key}"
) as project_response:
await session.close()
project_response.raise_for_status()
return await project_response.json()

async def get_paginated_projects(
self,
self,
) -> AsyncGenerator[list[dict[str, Any]], None]:
logger.info("Getting projects from Jira")

Expand All @@ -117,9 +124,11 @@ async def get_paginated_projects(
params["startAt"] += PAGE_SIZE

async def get_single_issue(self, issue_key: str) -> dict[str, Any]:
issue_response = await self.client.get(f"{self.api_url}/issue/{issue_key}")
issue_response.raise_for_status()
return await issue_response.json()
async with self._create_session() as session:
async with session.get(f"{self.api_url}/issue/{issue_key}") as issue_response:
await session.close()
issue_response.raise_for_status()
return await issue_response.json()

async def get_paginated_issues(self) -> AsyncGenerator[list[dict[str, Any]], None]:
logger.info("Getting issues from Jira")
Expand Down
44 changes: 20 additions & 24 deletions port_ocean/utils/repeat.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from typing import Callable, Coroutine, Any

from loguru import logger
from starlette.concurrency import run_in_threadpool

from port_ocean.utils.signal import signal_handler

Expand All @@ -15,10 +16,6 @@
]


def run_in_threadpool(param):
pass


def repeat_every(
seconds: float,
wait_first: bool = False,
Expand Down Expand Up @@ -61,27 +58,26 @@ async def wrapped() -> None:
async def loop() -> None:
nonlocal repetitions

# if wait_first:
# await asyncio.sleep(seconds)
# count the repetition even if an exception is raised
repetitions += 1
try:
if is_coroutine:
task = asyncio.create_task(func())
signal_handler.register(lambda: task.cancel())
ensure_future(task)
else:
run_in_threadpool(func)
except Exception as exc:
formatted_exception = "".join(
format_exception(type(exc), exc, exc.__traceback__)
)
logger.error(formatted_exception)
if raise_exceptions:
raise exc
if wait_first:
await asyncio.sleep(seconds)
if max_repetitions is None or repetitions < max_repetitions:
asyncio.get_event_loop().call_later(seconds, loop)
while max_repetitions is None or repetitions < max_repetitions:
# count the repetition even if an exception is raised
repetitions += 1
try:
if is_coroutine:
task = asyncio.create_task(func())
signal_handler.register(lambda: task.cancel())
ensure_future(task)
else:
await run_in_threadpool(func)
except Exception as exc:
formatted_exception = "".join(
format_exception(type(exc), exc, exc.__traceback__)
)
logger.error(formatted_exception)
if raise_exceptions:
raise exc
await asyncio.sleep(seconds)

ensure_future(loop())

Expand Down

0 comments on commit 99db061

Please sign in to comment.