Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

배치 잡 스케줄러 (feat: 슬랙 멤버 동기화) #24

Merged
merged 8 commits into from
Jul 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 3 additions & 5 deletions .github/workflows/test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -32,17 +32,15 @@ jobs:
run: mysql -h 127.0.0.1 -P 3307 -u root -p'root-password' -e "ALTER DATABASE testdb CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci"

- name: Install poetry
run: pipx install poetry
run: |
pipx install poetry
poetry config virtualenvs.in-project true --local
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

기록) pre-commit 테스트 스텝에서 PyRight 가상환경 경로 탐색 실패 이슈로 추가


- uses: actions/setup-python@v4
with:
python-version: 3.11
cache: poetry

- name: Set poetry environment
run: |
poetry env use 3.11

- name: Install dependencies
run: poetry install --no-root

Expand Down
550 changes: 467 additions & 83 deletions poetry.lock

Large diffs are not rendered by default.

3 changes: 3 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ boto3 = "^1.28.56"
pydantic-settings = "^2.0.3"
httpx = "^0.25.0"
pydantic = "^2.4.2"
aiohttp = "^3.9.5"
schedule = "^1.2.2"
pytz = "^2024.1"
Comment on lines +25 to +27
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

aiohttp - slack_sdk.web.async_client.AsyncWebClient 에 필요
schedule, pytz - 배치 잡 스케줄링에 필요


[tool.poetry.group.dev.dependencies]
pytest = "^7.4.2"
Expand Down
12 changes: 12 additions & 0 deletions waffledotcom/src/app.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
import asyncio

from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware

from waffledotcom.src.apps.router import api_router
from waffledotcom.src.batch.scheduler import schedule_tasks
from waffledotcom.src.database.connection import DBSessionFactory
from waffledotcom.src.settings import settings

Expand Down Expand Up @@ -29,6 +32,14 @@ def on_shutdown():
return on_shutdown


def _register_startup_event(app: FastAPI):
@app.on_event("startup")
def on_startup():
asyncio.create_task(schedule_tasks())

return on_startup


def create_app() -> FastAPI:
app = FastAPI(
title="waffledotcom-server",
Expand All @@ -40,4 +51,5 @@ def create_app() -> FastAPI:
_add_middlewares(app)
_add_routers(app)
_register_shutdown_event(app)
_register_startup_event(app)
return app
62 changes: 39 additions & 23 deletions waffledotcom/src/apps/user/services.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,34 +48,50 @@ def create_user(self, request: UserCreateUpdateRequest) -> UserCreateResponse:
raise UserAlreadyExistsException from exc
return UserCreateResponse.from_orm(user)

def create_users_from_slack(self, slack_members: list[SlackMember]) -> None:
users = [
User(
image_url=slack_member.profile.image_192,
def create_or_update_users_from_slack(
self, slack_members: list[SlackMember]
) -> None:
for slack_member in slack_members:
self.create_or_update_user_from_slack(slack_member)

def create_or_update_user_from_slack(self, slack_member: SlackMember) -> None:
deleted_or_not_certified = (
slack_member.deleted
or slack_member.is_bot
or not slack_member.is_email_confirmed
)

if deleted_or_not_certified:
return

user = self.user_repository.get_user_by_slack_id(slack_member.id)

if user is None:
user = User(
slack_id=slack_member.id,
first_name=slack_member.profile.first_name or "",
last_name=slack_member.profile.last_name or "",
slack_email=slack_member.profile.email,
first_name=slack_member.real_name or "",
phone_number=slack_member.profile.phone or None,
last_name="",
image_url=slack_member.profile.image_192,
github_id=slack_member.profile.github_id,
# 추후 DB의 Position 테이블과 정합성 맞춘 후 추가
# position=slack_member.profile.position,
generation=slack_member.profile.generation,
is_member=True,
)
for slack_member in slack_members
]

for user in users:
assert isinstance(user.slack_id, str)
if (
created_user := self.user_repository.get_user_by_slack_id(user.slack_id)
) is None:
self.user_repository.create_user(user)
continue

created_user.slack_email = user.slack_email
created_user.phone_number = user.phone_number
created_user.image_url = user.image_url
created_user.first_name = user.first_name
created_user.last_name = user.last_name
self.user_repository.update_user(created_user)
self.user_repository.create_user(user)
else:
user.first_name = slack_member.profile.first_name or user.first_name
user.last_name = slack_member.profile.last_name or user.last_name
user.slack_email = slack_member.profile.email or user.slack_email
user.phone_number = slack_member.profile.phone or user.phone_number
user.image_url = slack_member.profile.image_192 or user.image_url
user.github_id = slack_member.profile.github_id or user.github_id
# user.position = slack_member.profile.position or user.position
user.generation = slack_member.profile.generation or user.generation
user.is_member = True
self.user_repository.update_user(user)
Comment on lines +85 to +94
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

None 값이 기존 값을 덮어씌우지 않도록 개선


def update_user(
self, user_id: int, request: UserCreateUpdateRequest
Expand Down
22 changes: 22 additions & 0 deletions waffledotcom/src/batch/scheduler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
"""
TODO : 추후 배치잡이 많아지거나 부하가 심하다고 판단되면 pod를 분리한다.
"""

import asyncio

from schedule import Scheduler

from waffledotcom.src.batch.slack.main import main as slack_main
from waffledotcom.src.settings import settings


async def schedule_tasks():
scheduler = Scheduler()
if settings.is_dev:
scheduler.every().saturday.at("00:00", "Asia/Seoul").do(slack_main)
if settings.is_prod:
scheduler.every().sunday.at("00:00", "Asia/Seoul").do(slack_main)
Comment on lines +15 to +18
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

버그 발생 시 조기 탐지를 위해 dev에서 하루 먼저 실행

while True:
# 최소 주기를 60초로 설정
await asyncio.sleep(60)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

비록 현재 유일한 배치잡인 슬랙 동기화 작업은 1주일마다 실행하지만, 실행 주기의 정밀도와 추후에 더 짧은 주기의 배치잡이 추가될 것을 감안하여 1분으로 설정
실행할 잡이 없을 때 run_pending 은 충분히 간단한 작업이므로 1분이면 충분히 길다고 판단됨

scheduler.run_pending()
45 changes: 14 additions & 31 deletions waffledotcom/src/batch/slack/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,47 +2,30 @@

from fastapi import Depends
from loguru import logger
from slack_sdk import WebClient
from slack_sdk.errors import SlackApiError

from waffledotcom.src.apps.user.services import UserService
from waffledotcom.src.batch.slack.config import slack_config
from waffledotcom.src.batch.slack.schema import SlackMember
from waffledotcom.src.batch.slack.services import AsyncSlackApiService
from waffledotcom.src.utils.dependency_solver import DependencySolver


async def create_users_from_slack(user_service: UserService = Depends()):
client = WebClient(token=slack_config.token)
data = client.users_list().data

assert isinstance(data, dict)

if not data.get("ok", False):
raise SlackApiError("Slack API Error", data)

members_to_create = []
for member in data.get("members", []):
if member["is_bot"] or member["deleted"] or member["id"] == "USLACKBOT":
continue

member = SlackMember(**member)
if member.profile.phone is not None:
phone = (
member.profile.phone.replace("-", "")
.replace(" ", "")
.replace("+82", "")
)
member.profile.phone = phone

members_to_create.append(member)

user_service.create_users_from_slack(members_to_create)
async def create_users_from_slack(
user_service: UserService = Depends(),
slack_api_service: AsyncSlackApiService = Depends(),
):
members_to_create = await slack_api_service.get_members()
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

멤버 조회 로직을 별도 서비스로 분리

for member in members_to_create:
profile = await slack_api_service.get_profile(member.id)
member.profile = profile
user_service.create_or_update_users_from_slack(members_to_create)
logger.debug(f"Created {len(members_to_create)} users from slack")


def main():
solver = DependencySolver()
asyncio.run(solver.run(create_users_from_slack))
try:
asyncio.create_task(solver.run(create_users_from_slack))
except RuntimeError:
asyncio.run(solver.run(create_users_from_slack))
Comment on lines +25 to +28
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

asyncio 루프 안에서 실행되는 케이스 대응



if __name__ == "__main__":
Expand Down
56 changes: 47 additions & 9 deletions waffledotcom/src/batch/slack/schema.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,18 @@
from __future__ import annotations

from pydantic import BaseModel
import re
from enum import StrEnum
from urllib.parse import urlparse

from pydantic import AliasPath, BaseModel, Field, field_validator

PHONE_PATTERN = re.compile(r"^(010|011|016)\d{7,8}$")

class SlackMember(BaseModel):
id: str
real_name: str | None = None
profile: SlackMemberProfile
deleted: bool
is_bot: bool
is_email_confirmed: bool | None = None
is_admin: bool | None = None

class CustomFieldId(StrEnum):
GITHUB_LINK = "Xf01UD0C7526"
POSITION = "Xf01UD0AM3S6"
GENERATION = "Xf02CN9EEQCD"
Comment on lines +12 to +15
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

team.profile.get API 를 통해 알아낸 값이며, 운영팀에서 해당 필드들을 삭제하기 전까지 변하지 않는 값들이므로 enum 으로 박아놓음



class SlackMemberProfile(BaseModel):
Expand All @@ -19,6 +21,42 @@ class SlackMemberProfile(BaseModel):
email: str | None = None
phone: str | None = None
image_192: str | None = None
github_id: str | None = Field(
None, validation_alias=AliasPath("fields", CustomFieldId.GITHUB_LINK, "value")
)
position: str | None = Field(
None, validation_alias=AliasPath("fields", CustomFieldId.POSITION, "value")
)
generation: str | None = Field(
None, validation_alias=AliasPath("fields", CustomFieldId.GENERATION, "value")
)
Comment on lines +24 to +32
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

커스텀 필드이지만 fields 로 묶여있을 이유가 없다고 판단되어 Profile 클래스에 포함되도록 평탄화


@field_validator("phone")
@classmethod
def check_phone_number(cls, value: str | None) -> str | None:
if value is not None:
value = value.replace("-", "").replace(" ", "")
if not PHONE_PATTERN.match(value):
value = None
return value

@field_validator("github_id")
@classmethod
def check_github_id(cls, value: str | None) -> str | None:
result = urlparse(value)
assert result.scheme == "https", "Invalid URL scheme"
assert result.netloc in ["github.com", "www.github.com"], "Invalid URL netloc"
return str(result.path).split("/")[1]
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

사용자가 입력한 깃헙 링크가 https://github.com/minkyu97 과 같은 형태일 것으로 가정



class SlackMember(BaseModel):
id: str
real_name: str | None = None
profile: SlackMemberProfile
deleted: bool
is_bot: bool
is_email_confirmed: bool | None = None
is_admin: bool | None = None


SlackMember.model_rebuild()
67 changes: 67 additions & 0 deletions waffledotcom/src/batch/slack/services.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
import asyncio
from collections.abc import Callable
from typing import Any, Coroutine

from slack_sdk.errors import SlackApiError
from slack_sdk.web.async_client import AsyncWebClient
from slack_sdk.web.async_slack_response import AsyncSlackResponse

from waffledotcom.src.batch.slack.config import slack_config
from waffledotcom.src.batch.slack.schema import SlackMember, SlackMemberProfile


class AsyncSlackApiService:
def __init__(self) -> None:
self.client = AsyncWebClient(token=slack_config.token)

async def get_members(self) -> list[SlackMember]:
resp = await self.call_api_with_retry(self.client.users_list)

if not resp.get("ok", False) or "members" not in resp:
raise SlackApiError("Slack API Error", resp.data)

members = [
SlackMember(**member)
for member in resp.get("members", [])
if not member["is_bot"]
and not member["deleted"]
and member["id"] != "USLACKBOT"
]

return members

async def get_profile(self, user_key: str) -> SlackMemberProfile:
resp = await self.call_api_with_retry(
self.client.users_profile_get,
kwargs={"user": user_key},
)

if not resp.get("ok", False) or "profile" not in resp:
raise SlackApiError("Slack API Error", resp.data)

profile = SlackMemberProfile(**resp.get("profile", {}))

return profile

async def call_api_with_retry(
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

slack api 는 사용량 제한이 있어서 꽤 높은 확률로 실패할 수 있음
그 경우 정해진 retry 횟수만큼 10초 기다렸다가 재시도를 반복하는 로직 추가

self,
f: Callable[..., Coroutine[Any, Any, AsyncSlackResponse]],
*,
retry: int = 10,
args: list[Any] = [],
kwargs: dict[str, Any] = {},
) -> AsyncSlackResponse:
if getattr(self.client, f.__name__, None) != f:
raise AttributeError(f"{f} is not a method of WebClient")

exc = Exception("이게 실행된다면 버그이다.")

for _ in range(retry):
try:
response = await f(*args, **kwargs)
return response
except Exception as e:
exc = e
await asyncio.sleep(10)

raise exc
Loading