-
Notifications
You must be signed in to change notification settings - Fork 1
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
배치 잡 스케줄러 (feat: 슬랙 멤버 동기화) #24
Changes from 7 commits
9cc7572
c49e62f
4d205ec
a1df55e
3264b76
b01ec78
1470257
0938bd8
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Large diffs are not rendered by default.
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -22,6 +22,9 @@ boto3 = "^1.28.56" | |
pydantic-settings = "^2.0.3" | ||
httpx = "^0.25.0" | ||
pydantic = "^2.4.2" | ||
aiohttp = "^3.9.5" | ||
schedule = "^1.2.2" | ||
pytz = "^2024.1" | ||
Comment on lines
+25
to
+27
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. aiohttp - slack_sdk.web.async_client.AsyncWebClient 에 필요 |
||
|
||
[tool.poetry.group.dev.dependencies] | ||
pytest = "^7.4.2" | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -48,34 +48,50 @@ def create_user(self, request: UserCreateUpdateRequest) -> UserCreateResponse: | |
raise UserAlreadyExistsException from exc | ||
return UserCreateResponse.from_orm(user) | ||
|
||
def create_users_from_slack(self, slack_members: list[SlackMember]) -> None: | ||
users = [ | ||
User( | ||
image_url=slack_member.profile.image_192, | ||
def create_or_update_users_from_slack( | ||
self, slack_members: list[SlackMember] | ||
) -> None: | ||
for slack_member in slack_members: | ||
self.create_or_update_user_from_slack(slack_member) | ||
|
||
def create_or_update_user_from_slack(self, slack_member: SlackMember) -> None: | ||
deleted_or_not_certified = ( | ||
slack_member.deleted | ||
or slack_member.is_bot | ||
or not slack_member.is_email_confirmed | ||
) | ||
|
||
if deleted_or_not_certified: | ||
return | ||
|
||
user = self.user_repository.get_user_by_slack_id(slack_member.id) | ||
|
||
if user is None: | ||
user = User( | ||
slack_id=slack_member.id, | ||
first_name=slack_member.profile.first_name or "", | ||
last_name=slack_member.profile.last_name or "", | ||
slack_email=slack_member.profile.email, | ||
first_name=slack_member.real_name or "", | ||
phone_number=slack_member.profile.phone or None, | ||
last_name="", | ||
image_url=slack_member.profile.image_192, | ||
github_id=slack_member.profile.github_id, | ||
# 추후 DB의 Position 테이블과 정합성 맞춘 후 추가 | ||
# position=slack_member.profile.position, | ||
generation=slack_member.profile.generation, | ||
is_member=True, | ||
) | ||
for slack_member in slack_members | ||
] | ||
|
||
for user in users: | ||
assert isinstance(user.slack_id, str) | ||
if ( | ||
created_user := self.user_repository.get_user_by_slack_id(user.slack_id) | ||
) is None: | ||
self.user_repository.create_user(user) | ||
continue | ||
|
||
created_user.slack_email = user.slack_email | ||
created_user.phone_number = user.phone_number | ||
created_user.image_url = user.image_url | ||
created_user.first_name = user.first_name | ||
created_user.last_name = user.last_name | ||
self.user_repository.update_user(created_user) | ||
self.user_repository.create_user(user) | ||
else: | ||
user.first_name = slack_member.profile.first_name or user.first_name | ||
user.last_name = slack_member.profile.last_name or user.last_name | ||
user.slack_email = slack_member.profile.email or user.slack_email | ||
user.phone_number = slack_member.profile.phone or user.phone_number | ||
user.image_url = slack_member.profile.image_192 or user.image_url | ||
user.github_id = slack_member.profile.github_id or user.github_id | ||
# user.position = slack_member.profile.position or user.position | ||
user.generation = slack_member.profile.generation or user.generation | ||
user.is_member = True | ||
self.user_repository.update_user(user) | ||
Comment on lines
+85
to
+94
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. None 값이 기존 값을 덮어씌우지 않도록 개선 |
||
|
||
def update_user( | ||
self, user_id: int, request: UserCreateUpdateRequest | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,22 @@ | ||
""" | ||
TODO : 추후 배치잡이 많아지거나 부하가 심하다고 판단되면 pod를 분리한다. | ||
""" | ||
|
||
import asyncio | ||
|
||
from schedule import Scheduler | ||
|
||
from waffledotcom.src.batch.slack.main import main as slack_main | ||
from waffledotcom.src.settings import settings | ||
|
||
|
||
async def schedule_tasks(): | ||
scheduler = Scheduler() | ||
if settings.is_dev: | ||
scheduler.every().saturday.at("00:00", "Asia/Seoul").do(slack_main) | ||
if settings.is_prod: | ||
scheduler.every().sunday.at("00:00", "Asia/Seoul").do(slack_main) | ||
Comment on lines
+15
to
+18
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 버그 발생 시 조기 탐지를 위해 dev에서 하루 먼저 실행 |
||
while True: | ||
# 최소 주기를 60초로 설정 | ||
await asyncio.sleep(60) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 비록 현재 유일한 배치잡인 슬랙 동기화 작업은 1주일마다 실행하지만, 실행 주기의 정밀도와 추후에 더 짧은 주기의 배치잡이 추가될 것을 감안하여 1분으로 설정 |
||
scheduler.run_pending() |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2,47 +2,30 @@ | |
|
||
from fastapi import Depends | ||
from loguru import logger | ||
from slack_sdk import WebClient | ||
from slack_sdk.errors import SlackApiError | ||
|
||
from waffledotcom.src.apps.user.services import UserService | ||
from waffledotcom.src.batch.slack.config import slack_config | ||
from waffledotcom.src.batch.slack.schema import SlackMember | ||
from waffledotcom.src.batch.slack.services import AsyncSlackApiService | ||
from waffledotcom.src.utils.dependency_solver import DependencySolver | ||
|
||
|
||
async def create_users_from_slack(user_service: UserService = Depends()): | ||
client = WebClient(token=slack_config.token) | ||
data = client.users_list().data | ||
|
||
assert isinstance(data, dict) | ||
|
||
if not data.get("ok", False): | ||
raise SlackApiError("Slack API Error", data) | ||
|
||
members_to_create = [] | ||
for member in data.get("members", []): | ||
if member["is_bot"] or member["deleted"] or member["id"] == "USLACKBOT": | ||
continue | ||
|
||
member = SlackMember(**member) | ||
if member.profile.phone is not None: | ||
phone = ( | ||
member.profile.phone.replace("-", "") | ||
.replace(" ", "") | ||
.replace("+82", "") | ||
) | ||
member.profile.phone = phone | ||
|
||
members_to_create.append(member) | ||
|
||
user_service.create_users_from_slack(members_to_create) | ||
async def create_users_from_slack( | ||
user_service: UserService = Depends(), | ||
slack_api_service: AsyncSlackApiService = Depends(), | ||
): | ||
members_to_create = await slack_api_service.get_members() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 멤버 조회 로직을 별도 서비스로 분리 |
||
for member in members_to_create: | ||
profile = await slack_api_service.get_profile(member.id) | ||
member.profile = profile | ||
user_service.create_or_update_users_from_slack(members_to_create) | ||
logger.debug(f"Created {len(members_to_create)} users from slack") | ||
|
||
|
||
def main(): | ||
solver = DependencySolver() | ||
asyncio.run(solver.run(create_users_from_slack)) | ||
try: | ||
asyncio.create_task(solver.run(create_users_from_slack)) | ||
except RuntimeError: | ||
asyncio.run(solver.run(create_users_from_slack)) | ||
Comment on lines
+25
to
+28
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. asyncio 루프 안에서 실행되는 케이스 대응 |
||
|
||
|
||
if __name__ == "__main__": | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,16 +1,18 @@ | ||
from __future__ import annotations | ||
|
||
from pydantic import BaseModel | ||
import re | ||
from enum import StrEnum | ||
from urllib.parse import urlparse | ||
|
||
from pydantic import AliasPath, BaseModel, Field, field_validator | ||
|
||
PHONE_PATTERN = re.compile(r"^(010|011|016)\d{7,8}$") | ||
|
||
class SlackMember(BaseModel): | ||
id: str | ||
real_name: str | None = None | ||
profile: SlackMemberProfile | ||
deleted: bool | ||
is_bot: bool | ||
is_email_confirmed: bool | None = None | ||
is_admin: bool | None = None | ||
|
||
class CustomFieldId(StrEnum): | ||
GITHUB_ID = "Xf01UD0C7526" | ||
POSITION = "Xf01UD0AM3S6" | ||
GENERATION = "Xf02CN9EEQCD" | ||
|
||
|
||
class SlackMemberProfile(BaseModel): | ||
|
@@ -19,6 +21,42 @@ class SlackMemberProfile(BaseModel): | |
email: str | None = None | ||
phone: str | None = None | ||
image_192: str | None = None | ||
github_id: str | None = Field( | ||
None, validation_alias=AliasPath("fields", CustomFieldId.GITHUB_ID, "value") | ||
) | ||
position: str | None = Field( | ||
None, validation_alias=AliasPath("fields", CustomFieldId.POSITION, "value") | ||
) | ||
generation: str | None = Field( | ||
None, validation_alias=AliasPath("fields", CustomFieldId.GENERATION, "value") | ||
) | ||
|
||
@field_validator("phone") | ||
@classmethod | ||
def check_phone_number(cls, value: str | None) -> str | None: | ||
if value is not None: | ||
value = value.replace("-", "").replace(" ", "") | ||
if not PHONE_PATTERN.match(value): | ||
value = None | ||
return value | ||
|
||
@field_validator("github_id") | ||
@classmethod | ||
def check_github_id(cls, value: str | None) -> str | None: | ||
result = urlparse(value) | ||
assert result.scheme == "https", "Invalid URL scheme" | ||
assert result.netloc in ["github.com", "www.github.com"], "Invalid URL netloc" | ||
return str(result.path).split("/")[1] | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 사용자가 입력한 깃헙 링크가 https://github.com/minkyu97 과 같은 형태일 것으로 가정 |
||
|
||
|
||
class SlackMember(BaseModel): | ||
id: str | ||
real_name: str | None = None | ||
profile: SlackMemberProfile | ||
deleted: bool | ||
is_bot: bool | ||
is_email_confirmed: bool | None = None | ||
is_admin: bool | None = None | ||
|
||
|
||
SlackMember.model_rebuild() |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,67 @@ | ||
import asyncio | ||
from collections.abc import Callable | ||
from typing import Any, Coroutine | ||
|
||
from slack_sdk.errors import SlackApiError | ||
from slack_sdk.web.async_client import AsyncWebClient | ||
from slack_sdk.web.async_slack_response import AsyncSlackResponse | ||
|
||
from waffledotcom.src.batch.slack.config import slack_config | ||
from waffledotcom.src.batch.slack.schema import SlackMember, SlackMemberProfile | ||
|
||
|
||
class AsyncSlackApiService: | ||
def __init__(self) -> None: | ||
self.client = AsyncWebClient(token=slack_config.token) | ||
|
||
async def get_members(self) -> list[SlackMember]: | ||
resp = await self.call_api_with_retry(self.client.users_list) | ||
|
||
if not resp.get("ok", False) or "members" not in resp: | ||
raise SlackApiError("Slack API Error", resp.data) | ||
|
||
members = [ | ||
SlackMember(**member) | ||
for member in resp.get("members", []) | ||
if not member["is_bot"] | ||
and not member["deleted"] | ||
and member["id"] != "USLACKBOT" | ||
] | ||
|
||
return members | ||
|
||
async def get_profile(self, user_key: str) -> SlackMemberProfile: | ||
resp = await self.call_api_with_retry( | ||
self.client.users_profile_get, | ||
kwargs={"user": user_key}, | ||
) | ||
|
||
if not resp.get("ok", False) or "profile" not in resp: | ||
raise SlackApiError("Slack API Error", resp.data) | ||
|
||
profile = SlackMemberProfile(**resp.get("profile", {})) | ||
|
||
return profile | ||
|
||
async def call_api_with_retry( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. slack api 는 사용량 제한이 있어서 꽤 높은 확률로 실패할 수 있음 |
||
self, | ||
f: Callable[..., Coroutine[Any, Any, AsyncSlackResponse]], | ||
*, | ||
retry: int = 10, | ||
args: list[Any] = [], | ||
kwargs: dict[str, Any] = {}, | ||
) -> AsyncSlackResponse: | ||
if getattr(self.client, f.__name__, None) != f: | ||
raise AttributeError(f"{f} is not a method of WebClient") | ||
|
||
exc = Exception("이게 실행된다면 버그이다.") | ||
|
||
for _ in range(retry): | ||
try: | ||
response = await f(*args, **kwargs) | ||
return response | ||
except Exception as e: | ||
exc = e | ||
await asyncio.sleep(10) | ||
|
||
raise exc |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
기록) pre-commit 테스트 스텝에서 PyRight 가상환경 경로 탐색 실패 이슈로 추가