generated from canonical/is-charms-template-repo
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathwebhook_redelivery.py
406 lines (331 loc) · 14.2 KB
/
webhook_redelivery.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
# Copyright 2025 Canonical Ltd.
# See LICENSE file for licensing details.
"""Redeliver failed webhooks since a given time.
Only webhooks with action type queued are redelivered (as the others are not routable).
"""
import argparse
import json
import logging
import os
import sys
from collections import namedtuple
from dataclasses import dataclass
from datetime import datetime, timedelta, timezone
from functools import wraps
from typing import Callable, Iterator, ParamSpec, TypeVar
from github import BadCredentialsException, Github, GithubException, RateLimitExceededException
from github.Auth import AppAuth, AppInstallationAuth, Token
from pydantic import BaseModel
from webhook_router.app import SUPPORTED_GITHUB_EVENT
from webhook_router.router import ROUTABLE_JOB_STATUS
ARG_PARSE_ERROR_EXIT_CODE = 1
REDELIVERY_ERROR_EXIT_CODE = 2
GITHUB_TOKEN_ENV_NAME = "GITHUB_TOKEN"
GITHUB_APP_CLIENT_ID_ENV_NAME = "GITHUB_APP_CLIENT_ID"
GITHUB_APP_INSTALLATION_ID_ENV_NAME = "GITHUB_APP_INSTALLATION_ID"
GITHUB_APP_PRIVATE_KEY_ENV_NAME = "GITHUB_APP_PRIVATE_KEY"
OK_STATUS = "OK"
P = ParamSpec("P")
R = TypeVar("R")
logger = logging.getLogger(__name__)
class GithubAppAuthDetails(BaseModel):
"""The details to authenticate with Github using a Github App.
Attributes:
client_id: The Github App client ID.
installation_id: The installation ID of the Github App.
private_key: The private key to authenticate with Github.
"""
client_id: str
installation_id: int
private_key: str
GithubToken = str
GithubAuthDetails = GithubAppAuthDetails | GithubToken
_ParsedArgs = namedtuple("_ParsedArgs", ["since", "github_auth_details", "webhook_address"])
@dataclass
class WebhookAddress:
"""The address details to identify the webhook.
Attributes:
github_org: Github organisation where the webhook is registered.
github_repo: Github repository, where the webhook is registered. Only applicable for
repository webhooks.
id: The identifier of the webhook.
"""
github_org: str
github_repo: str | None
id: int
@dataclass
class _WebhookDeliveryAttempt:
"""The details of a webhook delivery attempt.
Attributes:
id: The identifier of the delivery.
status: The status of the delivery.
delivered_at: The time the delivery was made.
action: The action type of the delivery.
See https://docs.github.com/en/webhooks/webhook-events-and-payloads#workflow_job for
possible values for the workflow_job event. May be null for other events (e.g. ping).
event: The event type of the delivery.
"""
id: int
status: str
delivered_at: datetime
action: str | None
event: str
class RedeliveryError(Exception):
"""Raised when an error occurs during redelivery."""
class ArgParseError(Exception):
"""Raised when an error occurs during argument parsing."""
def main() -> None: # pragma: no cover this is checked by integration tests
"""Run the module as script."""
args = _arg_parsing()
redelivery_count = _redeliver_failed_webhook_delivery_attempts(
github_auth=args.github_auth_details,
webhook_address=args.webhook_address,
since_seconds=args.since,
)
print(json.dumps({"redelivered": redelivery_count}))
def _arg_parsing() -> _ParsedArgs: # pragma: no cover this is checked by integration tests
"""Parse the command line arguments.
Raises:
ArgParseError: If the arguments are invalid.
Returns:
The parsed arguments.
"""
parser = argparse.ArgumentParser(
description=f"{__doc__}. The script returns the amount of redelivered webhooks in JSON"
" format. The script assumes github authentication details to be given via environment"
" variables. Depending on the authentication method, the script expects the environment"
f" variable {GITHUB_TOKEN_ENV_NAME} to be passed for token based authentication, or"
f" {GITHUB_APP_CLIENT_ID_ENV_NAME}, {GITHUB_APP_INSTALLATION_ID_ENV_NAME},"
f" {GITHUB_APP_PRIVATE_KEY_ENV_NAME} for GitHub App authentication."
)
parser.add_argument(
"--since",
type=int,
help="The amount of seconds to look back for failed deliveries.",
required=True,
)
parser.add_argument(
"--github-path",
type=str,
help=(
"The path of the organisation or repository where the webhooks are registered. Should"
"be in the format of <organisation> or <organisation>/<repository>."
),
required=True,
)
parser.add_argument(
"--webhook-id",
type=int,
help="The identifier of the webhook for which delivery attempts are being checked.",
required=True,
)
args = parser.parse_args()
github_app_client_id = os.getenv(GITHUB_APP_CLIENT_ID_ENV_NAME)
github_app_installation_id = os.getenv(GITHUB_APP_INSTALLATION_ID_ENV_NAME)
github_app_private_key = os.getenv(GITHUB_APP_PRIVATE_KEY_ENV_NAME)
github_token = os.getenv(GITHUB_TOKEN_ENV_NAME)
github_auth_details: GithubAuthDetails
got_str = (
f" Got {GITHUB_APP_CLIENT_ID_ENV_NAME} = {github_app_client_id},"
f" {GITHUB_APP_INSTALLATION_ID_ENV_NAME} = {github_app_installation_id},"
f" {GITHUB_APP_PRIVATE_KEY_ENV_NAME} = {'***' if github_app_private_key else None},"
f" {GITHUB_TOKEN_ENV_NAME} = {'***' if github_app_private_key else None}",
)
if github_token and (
github_app_client_id or github_app_installation_id or github_app_private_key
):
raise ArgParseError(
"Github auth details are specified in two ways. "
"Please specify only one of github token or github app auth details."
f"{got_str}"
)
if github_token:
github_auth_details = github_token
elif github_app_client_id and github_app_installation_id and github_app_private_key:
try:
github_auth_details = GithubAppAuthDetails(
client_id=github_app_client_id,
installation_id=int(github_app_installation_id),
private_key=github_app_private_key,
)
except ValueError as exc:
raise ArgParseError(f"Failed to parse github auth details: {exc}") from exc
else:
raise ArgParseError(
"Github auth details are not specified completely. "
"Am missing github token or complete set of app auth parameters."
f"{got_str}"
)
webhook_address = WebhookAddress(
github_org=args.github_path.split("/")[0],
github_repo=args.github_path.split("/")[1] if "/" in args.github_path else None,
id=args.webhook_id,
)
return _ParsedArgs(
since=args.since, github_auth_details=github_auth_details, webhook_address=webhook_address
)
def _github_api_exc_decorator(func: Callable[P, R]) -> Callable[P, R]:
"""Decorator to handle GitHub API exceptions."""
@wraps(func)
def _wrapper(*posargs: P.args, **kwargs: P.kwargs) -> R:
"""Wrap the function to handle Github API exceptions.
Catch Github API exceptions and raise an appropriate RedeliveryError instead.
Raises:
RedeliveryError: If an error occurs during redelivery.
Returns:
The result of the origin function when no github error occurs.
"""
try:
return func(*posargs, **kwargs)
except BadCredentialsException as exc:
logging.error("Github client credentials error: %s", exc, exc_info=exc)
raise RedeliveryError(
"The github client returned a Bad Credential error, "
"please ensure credentials are set and have proper access rights."
) from exc
except RateLimitExceededException as exc:
logging.error("Github rate limit exceeded error: %s", exc, exc_info=exc)
raise RedeliveryError(
"The github client is returning a Rate Limit Exceeded error, "
"please wait before retrying."
) from exc
except GithubException as exc:
logging.error("Github API error: %s", exc, exc_info=exc)
raise RedeliveryError(
"The github client encountered an error. Please have a look at the logs."
) from exc
return _wrapper
@_github_api_exc_decorator
def _redeliver_failed_webhook_delivery_attempts(
github_auth: GithubAuthDetails, webhook_address: WebhookAddress, since_seconds: int
) -> int:
"""Redeliver failed webhook deliveries since a certain number of seconds ago.
Args:
github_auth: The GitHub authentication details used to interact with the Github API.
webhook_address: The data to identify the webhook.
since_seconds: The amount of seconds to look back for failed deliveries.
Returns:
The number of failed webhook deliveries redelivered.
"""
github = _get_github_client(github_auth)
deliveries = _iter_delivery_attempts(github_client=github, webhook_address=webhook_address)
since_datetime = datetime.now(tz=timezone.utc) - timedelta(seconds=since_seconds)
failed_deliveries = _filter_for_failed_attempts_since(
deliveries=deliveries, since_datetime=since_datetime
)
redelivered_count = _redeliver_attempts(
deliveries=failed_deliveries, github_client=github, webhook_address=webhook_address
)
return redelivered_count
# Github App authentication is not tested in unit tests, but in integration tests.
def _get_github_client(github_auth: GithubAuthDetails) -> Github: # pragma: no cover
"""Get a Github client.
Args:
github_auth: The Github authentication details.
Returns:
The Github client.
"""
if isinstance(github_auth, GithubToken):
return Github(auth=Token(github_auth))
app_auth = AppAuth(app_id=github_auth.client_id, private_key=github_auth.private_key)
app_installation_auth = AppInstallationAuth(
app_auth=app_auth, installation_id=github_auth.installation_id
)
return Github(auth=app_installation_auth)
def _iter_delivery_attempts(
github_client: Github, webhook_address: WebhookAddress
) -> Iterator[_WebhookDeliveryAttempt]:
"""Iterate over webhook delivery attempts.
Args:
github_client: The GitHub client used to interact with the Github API.
webhook_address: The data to identify the webhook.
"""
webhook_origin = (
github_client.get_repo(f"{webhook_address.github_org}/{webhook_address.github_repo}")
if webhook_address.github_repo
else github_client.get_organization(webhook_address.github_org)
)
deliveries = webhook_origin.get_hook_deliveries(webhook_address.id)
for delivery in deliveries:
# we check that the API is really returning the expected fields with non-null vals
# as pygithub is not doing this validation for us
required_fields = {"id", "status", "delivered_at", "event"}
none_fields = {
field for field in required_fields if getattr(delivery, field, None) is None
}
if none_fields:
raise AssertionError(
f"The webhook delivery {delivery.raw_data} is missing required fields:"
f" {none_fields}"
)
yield _WebhookDeliveryAttempt(
id=delivery.id, # type: ignore
status=delivery.status, # type: ignore
delivered_at=delivery.delivered_at, # type: ignore
action=delivery.action, # type: ignore
event=delivery.event, # type: ignore
)
def _filter_for_failed_attempts_since(
deliveries: Iterator[_WebhookDeliveryAttempt], since_datetime: datetime
) -> Iterator[_WebhookDeliveryAttempt]:
"""Filter webhook delivery attempts for failed deliveries since a given time.
Args:
deliveries: The webhook delivery attempts.
since_datetime: The time to look back for failed deliveries.
"""
for delivery in deliveries:
if delivery.delivered_at < since_datetime:
break
if (
delivery.status != OK_STATUS
and delivery.action == ROUTABLE_JOB_STATUS
and delivery.event == SUPPORTED_GITHUB_EVENT
):
yield delivery
def _redeliver_attempts(
deliveries: Iterator[_WebhookDeliveryAttempt],
github_client: Github,
webhook_address: WebhookAddress,
) -> int:
"""Redeliver failed webhook deliveries since a given time.
Args:
deliveries: The webhook delivery attempts.
github_client: The GitHub client used to interact with the Github API.
webhook_address: The data to identify the webhook.
Returns:
The number of failed webhook deliveries redelivered.
"""
deliver_count = 0
for delivery in deliveries:
_redeliver_attempt(
github_client=github_client, webhook_address=webhook_address, delivery_id=delivery.id
)
deliver_count += 1
return deliver_count
def _redeliver_attempt(
github_client: Github, webhook_address: WebhookAddress, delivery_id: int
) -> None:
"""Redeliver a webhook delivery.
Args:
github_client: The GitHub client used to interact with the Github API.
webhook_address: The data to identify the webhook.
delivery_id: The identifier of the webhook delivery to redeliver.
"""
# pygithub doesn't support the endpoint so we have to use the requester directly to perform
# a raw request: https://pygithub.readthedocs.io/en/stable/utilities.html#raw-requests
path_base = (
f"/repos/{webhook_address.github_org}/{webhook_address.github_repo}"
if webhook_address.github_repo
else f"/orgs/{webhook_address.github_org}"
)
url = f"{path_base}/hooks/{webhook_address.id}/deliveries/{delivery_id}/attempts"
github_client.requester.requestJsonAndCheck("POST", url)
if __name__ == "__main__": # pragma: no cover this is checked by integration tests
try:
main()
except ArgParseError as exc:
print(f"{exc}", file=sys.stderr)
sys.exit(ARG_PARSE_ERROR_EXIT_CODE)
except RedeliveryError as exc:
logger.exception("Webhook redelivery failed: %s", exc)
sys.exit(REDELIVERY_ERROR_EXIT_CODE)