Skip to content

Commit

Permalink
Use Starlette
Browse files Browse the repository at this point in the history
  • Loading branch information
talsabagport committed Oct 31, 2024
1 parent 5318752 commit 6ae4877
Show file tree
Hide file tree
Showing 6 changed files with 74 additions and 85 deletions.
5 changes: 0 additions & 5 deletions integrations/jira/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,6 @@ async def on_resync_issues(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE:
yield issues


@ocean.router.post("/webhook")
async def handle_webhook_request(data: dict[str, Any]) -> dict[str, Any]:
return {"ok": True}


# Called once when the integration starts.
@ocean.on_start()
async def on_start() -> None:
Expand Down
5 changes: 0 additions & 5 deletions port_ocean/context/ocean.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
from typing import Callable, TYPE_CHECKING, Any, Literal, Union

from fastapi import APIRouter
from pydantic.main import BaseModel
from werkzeug.local import LocalProxy

Expand Down Expand Up @@ -44,10 +43,6 @@ def initialized(self) -> bool:
def config(self) -> "IntegrationConfiguration":
return self.app.config

@property
def router(self) -> APIRouter:
return self.app.integration_router

@property
def integration(self) -> "BaseIntegration":
return self.app.integration
Expand Down
2 changes: 1 addition & 1 deletion port_ocean/exceptions/api.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import abc

from fastapi.responses import Response, PlainTextResponse
from port_ocean.exceptions.base import BaseOceanException
from starlette.responses import Response, PlainTextResponse


class BaseAPIException(BaseOceanException, abc.ABC):
Expand Down
107 changes: 52 additions & 55 deletions port_ocean/middlewares.py
Original file line number Diff line number Diff line change
@@ -1,73 +1,70 @@
from typing import Callable, Awaitable

from fastapi import Request, Response
from loguru import logger
from starlette.requests import Request
from starlette.responses import Response

from port_ocean.exceptions.api import BaseAPIException, InternalServerException
from .context.event import event_context, EventType
from .context.ocean import ocean
from .utils.misc import get_time, generate_uuid
from starlette.middleware.base import (
BaseHTTPMiddleware,
RequestResponseEndpoint,
)


async def _handle_silently(
call_next: Callable[[Request], Awaitable[Response]], request: Request
) -> Response:
response: Response
try:
if request.url.path.startswith("/integration"):
async with event_context(EventType.HTTP_REQUEST, trigger_type="request"):
await ocean.integration.port_app_config_handler.get_port_app_config()
response = await call_next(request)
else:
response = await call_next(request)
class RequestHandlerMiddleware(BaseHTTPMiddleware):
async def dispatch(self, request: Request, call_next: RequestResponseEndpoint):
start_time = get_time(seconds_precision=False)
request_id = generate_uuid()

except BaseAPIException as ex:
response = ex.response()
if response.status_code < 500:
logger.bind(exception=str(ex)).info(
"Request did not succeed due to client-side error"
with logger.contextualize(request_id=request_id):
log_level = (
"DEBUG"
if request.url.path == "/docs" or request.url.path == "/openapi.json"
else "INFO"
)
else:
logger.opt(exception=True).warning(
"Request did not succeed due to server-side error"
logger.bind(url=str(request.url), method=request.method).log(
log_level, f"Request to {request.url.path} started"
)
response = await self._handle_silently(request, call_next)

except Exception:
logger.opt(exception=True).error("Request failed due to unexpected error")
response = InternalServerException().response()

return response
end_time = get_time(seconds_precision=False)
time_elapsed = round(end_time - start_time, 5)
response.headers["X-Request-ID"] = request_id
response.headers["X-Process-Time"] = str(time_elapsed)
logger.bind(
time_elapsed=time_elapsed, response_status=response.status_code
).log(log_level, f"Request to {request.url.path} ended")

return response

async def request_handler(
request: Request, call_next: Callable[[Request], Awaitable[Response]]
) -> Response:
"""Middleware used by FastAPI to process each request, featuring:
- Contextualize request logs with a unique Request ID (UUID4) for each unique request.
- Catch exceptions during the request handling. Translate custom API exceptions into responses,
or treat (and log) unexpected exceptions.
"""
start_time = get_time(seconds_precision=False)
request_id = generate_uuid()
async def _handle_silently(
self, request: Request, call_next: RequestResponseEndpoint
) -> Response:
response: Response
try:
if request.url.path.startswith("/integration"):
async with event_context(
EventType.HTTP_REQUEST, trigger_type="request"
):
await ocean.integration.port_app_config_handler.get_port_app_config()
response = await call_next(request)
else:
response = await call_next(request)

with logger.contextualize(request_id=request_id):
log_level = (
"DEBUG"
if request.url.path == "/docs" or request.url.path == "/openapi.json"
else "INFO"
)
logger.bind(url=str(request.url), method=request.method).log(
log_level, f"Request to {request.url.path} started"
)
response = await _handle_silently(call_next, request)
except BaseAPIException as ex:
response = ex.response()
if response.status_code < 500:
logger.bind(exception=str(ex)).info(
"Request did not succeed due to client-side error"
)
else:
logger.opt(exception=True).warning(
"Request did not succeed due to server-side error"
)

end_time = get_time(seconds_precision=False)
time_elapsed = round(end_time - start_time, 5)
response.headers["X-Request-ID"] = request_id
response.headers["X-Process-Time"] = str(time_elapsed)
logger.bind(
time_elapsed=time_elapsed, response_status=response.status_code
).log(log_level, f"Request to {request.url.path} ended")
except Exception:
logger.opt(exception=True).error("Request failed due to unexpected error")
response = InternalServerException().response()

return response
38 changes: 20 additions & 18 deletions port_ocean/ocean.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
import asyncio
import sys
import threading
from contextlib import asynccontextmanager
from typing import Callable, Any, Dict, AsyncIterator, Type
from typing import Callable, Any, Dict, Type

from fastapi import FastAPI, APIRouter
from loguru import logger
from pydantic import BaseModel
from starlette.middleware import Middleware
from starlette.responses import JSONResponse
from starlette.routing import Route
from starlette.types import Scope, Receive, Send

from port_ocean.clients.port.client import PortClient
Expand All @@ -22,25 +23,28 @@
from port_ocean.core.integrations.base import BaseIntegration
from port_ocean.core.models import Runtime
from port_ocean.log.sensetive import sensitive_log_filter
from port_ocean.middlewares import request_handler
from port_ocean.middlewares import RequestHandlerMiddleware
from port_ocean.utils.repeat import repeat_every
from port_ocean.utils.signal import signal_handler
from port_ocean.version import __integration_version__
import contextlib

from starlette.applications import Starlette


class Ocean:
def __init__(
self,
app: FastAPI | None = None,
app: Starlette | None = None,
integration_class: Callable[[PortOceanContext], BaseIntegration] | None = None,
integration_router: APIRouter | None = None,
integration_router: None = None,
config_factory: Type[BaseModel] | None = None,
config_override: Dict[str, Any] | None = None,
):
initialize_port_ocean_context(self)

@asynccontextmanager
async def lifecycle(_: FastAPI) -> AsyncIterator[None]:
@contextlib.asynccontextmanager
async def lifespan(app):
try:
await self.integration.start()
await self._setup_scheduled_resync()
Expand All @@ -51,9 +55,14 @@ async def lifecycle(_: FastAPI) -> AsyncIterator[None]:
finally:
signal_handler.exit()

self.fast_api_app = app or FastAPI(lifespan=lifecycle)
async def handle_webhook_request(data: dict[str, Any]) -> dict[str, Any]:
return JSONResponse({"ok": True})

# self.fast_api_app.middleware("http")(request_handler)
self.starlette_app = Starlette(
routes=[Route("/integration/webhook", endpoint=handle_webhook_request)],
middleware=[Middleware(RequestHandlerMiddleware)],
lifespan=lifespan,
)

self.config = IntegrationConfiguration(
# type: ignore
Expand All @@ -65,13 +74,6 @@ async def lifecycle(_: FastAPI) -> AsyncIterator[None]:
sensitive_log_filter.hide_sensitive_strings(
*self.config.get_sensitive_fields_data()
)
self.integration_router = integration_router or APIRouter()

@self.integration_router.post("/webhook")
async def handle_webhook_request(data: dict[str, Any]) -> dict[str, Any]:
return {"ok": True}

self.fast_api_app.include_router(self.integration_router, prefix="/integration")

self.port_client = PortClient(
base_url=self.config.port.base_url,
Expand Down Expand Up @@ -132,4 +134,4 @@ async def execute_resync_all() -> None:
await repeated_function()

async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None:
await self.fast_api_app(scope, receive, send)
await self.starlette_app(scope, receive, send)
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "port-ocean"
version = "0.12.2-dev19"
version = "0.12.2-dev20"
description = "Port Ocean is a CLI tool for managing your Port projects."
readme = "README.md"
homepage = "https://app.getport.io"
Expand Down

0 comments on commit 6ae4877

Please sign in to comment.