Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Re-authenticate if the session is closed by a concurrent request #1031

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions kopf/_cogs/clients/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,16 @@ async def request(
)
await errors.check_response(response) # but do not parse it!

# aiohttp raises a generic error if the session/transport is closed, so we try to guess.
except RuntimeError as e:
if context.session.closed:
# NB: this will reset the retry counter and do the full cycle with the new creds.
# TODO: find a way to gracefully replace the active session in the existing context,
# so that all ongoing requests would switch to new session & credentials.
logger.error(f"Request attempt {idx} failed; will try re-authenticating: {what}")
raise errors.APISessionClosed("Session is closed.") from e
raise

except (aiohttp.ClientConnectionError, errors.APIServerError, asyncio.TimeoutError) as e:
if backoff is None: # i.e. the last or the only attempt.
logger.error(f"Request attempt {idx} failed; escalating: {what} -> {e!r}")
Expand Down
2 changes: 1 addition & 1 deletion kopf/_cogs/clients/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ async def wrapper(*args: Any, **kwargs: Any) -> Any:
async for key, info, context in vault.extended(APIContext, 'contexts'):
try:
return await fn(*args, **kwargs, context=context)
except errors.APIUnauthorizedError as e:
except (errors.APIUnauthorizedError, errors.APISessionClosed) as e:
await vault.invalidate(key, exc=e)

# Normally, either `vault.extended()` or `vault.invalidate()` raise the login errors.
Expand Down
13 changes: 13 additions & 0 deletions kopf/_cogs/clients/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,19 @@ class APIConflictError(APIClientError):
pass


class APISessionClosed(Exception):
"""
A helper to escalate from inside the requests to cause re-authentication.

This happens when credentials expire while multiple concurrent requests
are ongoing (including their retries, mostly their back-off timeouts):
one random request will raise HTTP 401 and cause the re-authentication,
while others will retry their requests with the old session (now closed!)
and get a generic RuntimeError from aiohttp, thus failing their whole task.
"""
pass


async def check_response(
response: aiohttp.ClientResponse,
) -> None:
Expand Down