diff --git a/mondey_backend/src/mondey_backend/main.py b/mondey_backend/src/mondey_backend/main.py index 324cb891..ab137560 100644 --- a/mondey_backend/src/mondey_backend/main.py +++ b/mondey_backend/src/mondey_backend/main.py @@ -24,8 +24,8 @@ from .routers import questions from .routers import research from .routers import users -from .routers.utils import recompute_milestone_statistics -from .routers.utils import recompute_milestonegroup_statistics +from .routers.statistics import recompute_milestone_statistics +from .routers.statistics import recompute_milestonegroup_statistics from .settings import app_settings diff --git a/mondey_backend/src/mondey_backend/models/milestones.py b/mondey_backend/src/mondey_backend/models/milestones.py index fa85fa4d..657d0193 100644 --- a/mondey_backend/src/mondey_backend/models/milestones.py +++ b/mondey_backend/src/mondey_backend/models/milestones.py @@ -199,6 +199,7 @@ class MilestoneAgeScore(SQLModel, table=True): default=None, foreign_key="milestoneagescorecollection.id" ) collection: MilestoneAgeScoreCollection = back_populates("scores") + count: int avg_score: float stddev_score: float age_months: int @@ -206,12 +207,16 @@ class MilestoneAgeScore(SQLModel, table=True): class MilestoneAgeScorePublic(SQLModel): - def __init__(self, avg_score=0, stddev_score=0, age_months=0, expected_score=0): + def __init__( + self, count=0, avg_score=0, stddev_score=0, age_months=0, expected_score=0 + ): + self.count = count self.avg_score = avg_score self.stddev_score = stddev_score self.age_months = age_months self.expected_score = expected_score + count: int avg_score: float stddev_score: float age_months: int @@ -249,6 +254,7 @@ class MilestoneGroupAgeScore(SQLModel, table=True): default=None, foreign_key="milestonegroupagescorecollection.id" ) collection: MilestoneGroupAgeScoreCollection = back_populates("scores") + count: int avg_score: float stddev_score: float age_months: int @@ -258,12 +264,16 @@ class MilestoneGroupAgeScore(SQLModel, table=True): class MilestoneGroupAgeScorePublic(SQLModel): - def __init__(self, avg_score=0, stddev_score=0, age_months=0, milestonegroup_id=0): + def __init__( + self, count=0, avg_score=0, stddev_score=0, age_months=0, milestonegroup_id=0 + ): + self.count = count self.avg_score = avg_score self.stddev_score = stddev_score self.age_months = age_months self.milestonegroup_id = milestonegroup_id + count: int avg_score: float stddev_score: float age_months: int diff --git a/mondey_backend/src/mondey_backend/routers/admin_routers/milestones.py b/mondey_backend/src/mondey_backend/routers/admin_routers/milestones.py index 06b4e2cb..6f048516 100644 --- a/mondey_backend/src/mondey_backend/routers/admin_routers/milestones.py +++ b/mondey_backend/src/mondey_backend/routers/admin_routers/milestones.py @@ -9,7 +9,6 @@ from ...models.milestones import Language from ...models.milestones import Milestone from ...models.milestones import MilestoneAdmin -from ...models.milestones import MilestoneAgeScoreCollection from ...models.milestones import MilestoneGroup from ...models.milestones import MilestoneGroupAdmin from ...models.milestones import MilestoneGroupText @@ -19,7 +18,6 @@ from ...models.milestones import SubmittedMilestoneImagePublic from ...models.utils import ItemOrder from ..utils import add -from ..utils import calculate_milestone_statistics_by_age from ..utils import get from ..utils import milestone_group_image_path from ..utils import milestone_image_path @@ -180,10 +178,10 @@ async def delete_submitted_milestone_image( session.commit() return {"ok": True} - @router.get("/milestone-age-scores/{milestone_id}") - def get_milestone_age_scores( - session: SessionDep, milestone_id: int - ) -> MilestoneAgeScoreCollection: - return calculate_milestone_statistics_by_age(session, milestone_id) + # @router.get("/milestone-age-scores/{milestone_id}") + # def get_milestone_age_scores( + # session: SessionDep, milestone_id: int + # ) -> MilestoneAgeScoreCollection: + # return calculate_milestone_statistics_by_age(session, milestone_id) return router diff --git a/mondey_backend/src/mondey_backend/routers/statistics.py b/mondey_backend/src/mondey_backend/routers/statistics.py new file mode 100644 index 00000000..412855ab --- /dev/null +++ b/mondey_backend/src/mondey_backend/routers/statistics.py @@ -0,0 +1,292 @@ +from __future__ import annotations + +import datetime +from collections.abc import Sequence + +import numpy as np +from sqlmodel import col +from sqlmodel import select + +from ..dependencies import SessionDep +from ..models.milestones import Milestone +from ..models.milestones import MilestoneAgeScore +from ..models.milestones import MilestoneAgeScoreCollection +from ..models.milestones import MilestoneAnswer +from ..models.milestones import MilestoneAnswerSession +from ..models.milestones import MilestoneGroup +from ..models.milestones import MilestoneGroupAgeScore +from ..models.milestones import MilestoneGroupAgeScoreCollection +from .utils import _get_answer_session_child_ages_in_months +from .utils import _get_expected_age_from_scores +from .utils import add + + +# see: https://en.wikipedia.org/wiki/Algorithms_for_calculating_variance +# reason for not using existing package: bessel correction usually not respected +# we are using Welford's method here. This necessitates recording the count +def _add_sample( + count: int, + mean: float, + m2: float, + new_value: float, +) -> tuple[int, float, float]: + count += 1 + delta = new_value - mean + mean += delta / count + delta2 = new_value - mean + m2 += delta * delta2 + return count, mean, m2 + + +def _finalize_statistics( + count: int | np.ndarray[int], + mean: float | np.ndarray[float], + m2: float | np.ndarray[float], +) -> tuple[int | np.ndarray[int], float | np.ndarray[float], float | np.ndarray[float]]: + if isinstance(count, int): + if count < 2: + return count, mean, 0.0 + else: + variance = m2 / (count - 1) + return count, mean, np.sqrt(variance) + elif isinstance(count, np.ndarray): + with np.errstate(invalid="ignore"): + valid_counts = count >= 2 + variance = m2 + variance[valid_counts] /= count[valid_counts] - 1 + variance[not valid_counts] = 0.0 + return count, np.nan_to_num(mean), np.nan_to_num(np.sqrt(variance)) + else: + raise ValueError("given values must be of type int|float|np.ndarray") + + +def _get_statistics_by_age( + answers: Sequence[MilestoneAnswer], + child_ages: dict[int, int], + count: np.ndarray = None, + avg: np.ndarray = None, + stddev: np.ndarray = None, +) -> tuple[np.ndarray, np.ndarray, np.ndarray]: + if count is None or avg is None or stddev is None: + max_age_months = 72 + count = np.zeros(max_age_months + 1) + avg = np.zeros(max_age_months + 1) + stddev = np.zeros(max_age_months + 1) + + # online algorithm computes variance, so need to square stddev first + var = stddev**2 + + if child_ages == {}: + return count, avg, stddev + + for answer in answers: + age = child_ages[answer.answer_session_id] # type: ignore + new_count, new_avg, new_var = _add_sample( + count[age], avg[age], var[age], answer.answer + 1 + ) + count[age] = new_count + avg[age] = new_avg + var[age] = new_var + + count, avg, stddev = _finalize_statistics(count, avg, var) + + return count, avg, stddev + + +def calculate_milestone_statistics_by_age( + session: SessionDep, + milestone_id: int, +) -> MilestoneAgeScoreCollection: + """ + _summary_ + + Parameters + ---------- + session : SessionDep + _description_ + milestone_id : int + _description_ + answers : Sequence[MilestoneAnswer] | None, optional + _description_, by default None + + Returns + ------- + MilestoneAgeScoreCollection + _description_ + """ + # get the newest statistics for the milestone + last_statistics = session.exec( + select(MilestoneAgeScoreCollection) + .where(col(MilestoneAgeScoreCollection.milestone_id) == milestone_id) + .order_by(MilestoneAgeScoreCollection.created_at.desc()) + ).first() + + # initialize avg and stddev scores with the last known statistics or to None if no statistics are available + count = None + avg_scores = None + stddev_scores = None + if last_statistics is not None: + last_scores = last_statistics.scores + count = np.array([score.count for score in last_scores]) + avg_scores = np.array([score.avg_score for score in last_scores]) + stddev_scores = np.array([score.stddev_score for score in last_scores]) + + child_ages = _get_answer_session_child_ages_in_months(session) + + if last_statistics is None: + answers_query = select(MilestoneAnswer).where( + MilestoneAnswer.milestone_id == milestone_id + ) + else: + answers_query = ( + select(MilestoneAnswer) + .join( + MilestoneAnswerSession, + MilestoneAnswer.answer_session_id == MilestoneAnswerSession.id, + ) + .where( + MilestoneAnswer.milestone_id == milestone_id, + MilestoneAnswerSession.created_at > last_statistics.created_at, + ) + ) + answers = session.exec(answers_query).all() + + count, avg, stddev = _get_statistics_by_age( + answers, child_ages, count=count, avg=avg_scores, stddev=stddev_scores + ) + expected_age = _get_expected_age_from_scores(avg) + new_id = last_statistics.id + 1 if last_statistics is not None else 1 + return MilestoneAgeScoreCollection( + id=new_id, + milestone_id=milestone_id, + expected_age=expected_age, + created_at=datetime.datetime.now(), + scores=[ + MilestoneAgeScore( + collection_id=new_id, + count=count[age], + avg_score=avg[age], + stddev_score=stddev[age], + age_months=age, + expected_score=4 + if age >= expected_age + else 1, # TODO: placeholder algorithm? how does the model behind this look like really? + ) + for age in range(0, len(avg)) + ], + ) + + +def calculate_milestonegroup_statistics_by_age( + session: SessionDep, + milestonegroup_id, +) -> MilestoneGroupAgeScoreCollection: + """ + _summary_ + + Parameters + ---------- + session : SessionDep + _description_ + milestonegroup_id : _type_ + _description_ + answers : Sequence[MilestoneAnswer] | None, optional + _description_, by default None + + Returns + ------- + MilestoneGroupAgeScoreCollection + _description_ + """ + + # get the newest statistics for the milestonegroup + last_statistics = session.exec( + select(MilestoneGroupAgeScoreCollection) + .where( + col(MilestoneGroupAgeScoreCollection.milestonegroup_id) == milestonegroup_id + ) + .order_by(MilestoneGroupAgeScoreCollection.created_at.desc()) + ).first() + + count = None + avg_scores = None + stddev_scores = None + if last_statistics is None: + max_age_months = 72 + count = np.zeros(max_age_months + 1) + avg_scores = np.zeros(max_age_months + 1) + stddev_scores = np.zeros(max_age_months + 1) + else: + count = np.array([score.count for score in last_statistics.scores]) + avg_scores = np.array([score.avg_score for score in last_statistics.scores]) + stddev_scores = np.array( + [score.stddev_score for score in last_statistics.scores] + ) + + child_ages = _get_answer_session_child_ages_in_months(session) + + if last_statistics is None: + answer_query = select(MilestoneAnswer).where( + col(MilestoneAnswer.milestone_group_id) == milestonegroup_id + ) + else: + answer_query = ( + select(MilestoneAnswer) + .join( + MilestoneAnswerSession, + MilestoneAnswer.answer_session_id == MilestoneAnswerSession.id, + ) + .where( + MilestoneAnswer.milestone_group_id == milestonegroup_id, + MilestoneAnswerSession.created_at > last_statistics.created_at, + ) + ) + + answers = session.exec(answer_query).all() + + count, avg, stddev = _get_statistics_by_age( + answers, child_ages, count=count, avg=avg_scores, stddev=stddev_scores + ) + new_id = last_statistics.id + 1 if last_statistics is not None else 1 + return MilestoneGroupAgeScoreCollection( + id=new_id, + milestonegroup_id=milestonegroup_id, + scores=[ + MilestoneGroupAgeScore( + collection_id=new_id, + age_months=age, + count=count[age], + avg_score=avg[age], + stddev_score=stddev[age], + milestonegroup_id=milestonegroup_id, + ) + for age in range(0, len(avg)) + ], + created_at=datetime.datetime.now(), + ) + + +def recompute_milestonegroup_statistics(session: SessionDep): + # fetch all milestonegroup statsitcs and check how old they are. Then + # recompute the ones that are older than timedelta and put back into database + # do the same for milestone statistics + + milestonegroups = session.exec(select(MilestoneGroup.id)).all() + for milestonegroup in milestonegroups: + statistics = calculate_milestonegroup_statistics_by_age(session, milestonegroup) + for score in statistics.scores: + add(session, score) + add(session, statistics) + + +def recompute_milestone_statistics(session: SessionDep): + # fetch all milestonegroup statsitcs and check how old they are. Then + # recompute the ones that are older than timedelta and put back into database + # do the same for milestone statistics + + milestones = session.exec(select(Milestone.id)).all() + for milestone in milestones: + statistics = calculate_milestone_statistics_by_age(session, milestone) # type: ignore + for score in statistics.scores: + add(session, score) + add(session, statistics) diff --git a/mondey_backend/src/mondey_backend/routers/utils.py b/mondey_backend/src/mondey_backend/routers/utils.py index eb373df6..05a1f88d 100644 --- a/mondey_backend/src/mondey_backend/routers/utils.py +++ b/mondey_backend/src/mondey_backend/routers/utils.py @@ -4,7 +4,6 @@ import logging import pathlib from collections.abc import Iterable -from collections.abc import Sequence from typing import TypeVar import numpy as np @@ -23,14 +22,10 @@ from ..models.milestones import AgeInterval from ..models.milestones import Milestone from ..models.milestones import MilestoneAdmin -from ..models.milestones import MilestoneAgeScore -from ..models.milestones import MilestoneAgeScoreCollection from ..models.milestones import MilestoneAnswer from ..models.milestones import MilestoneAnswerSession from ..models.milestones import MilestoneGroup from ..models.milestones import MilestoneGroupAdmin -from ..models.milestones import MilestoneGroupAgeScore -from ..models.milestones import MilestoneGroupAgeScoreCollection from ..models.milestones import MilestoneGroupText from ..models.milestones import MilestoneText from ..models.questions import ChildQuestion @@ -216,189 +211,6 @@ def _get_expected_age_from_scores(scores: np.ndarray) -> int: return np.argmax(scores >= 3.0) -def _get_statistics_by_age( - answers: Sequence[MilestoneAnswer], child_ages: dict[int, int] -) -> tuple[np.ndarray, np.ndarray]: - """ - _summary_ - - Parameters - ---------- - answers : Sequence[MilestoneAnswer] - _description_ - child_ages : dict[int, int] - _description_ - - Returns - ------- - tuple[np.ndarray, np.ndarray] - _description_ - """ - max_age_months = 72 - avg_scores = np.zeros(max_age_months + 1) - stddev_scores = np.zeros(max_age_months + 1) - counts = np.zeros_like(avg_scores) - if child_ages == {}: - return avg_scores, stddev_scores - - # compute average - for answer in answers: - age = child_ages[answer.answer_session_id] # type: ignore - # convert 0-3 answer index to 1-4 score - avg_scores[age] += answer.answer + 1 - counts[age] += 1 - - with np.errstate(invalid="ignore"): - avg_scores /= counts - - # compute standard deviation - for answer in answers: - age = child_ages[answer.answer_session_id] # type: ignore - stddev_scores[age] += ((answer.answer + 1) - avg_scores[age]) ** 2 - - with np.errstate(invalid="ignore"): - stddev_scores = np.sqrt(stddev_scores / np.max(counts - 1, 0)) - - # replace NaNs (due to zero counts) with zeros - avg = np.nan_to_num(avg_scores) - stddev = np.nan_to_num(stddev_scores) - - return avg, stddev - - -def calculate_milestone_statistics_by_age( - session: SessionDep, - milestone_id: int, - answers: Sequence[MilestoneAnswer] | None = None, -) -> MilestoneAgeScoreCollection: - """ - _summary_ - - Parameters - ---------- - session : SessionDep - _description_ - milestone_id : int - _description_ - answers : Sequence[MilestoneAnswer] | None, optional - _description_, by default None - - Returns - ------- - MilestoneAgeScoreCollection - _description_ - """ - child_ages = _get_answer_session_child_ages_in_months(session) - # FIXME: change this to an online algorithm that starts with the last known statistcs - # and then adds the new answers - if answers is None: - answers = session.exec( - select(MilestoneAnswer).where( - col(MilestoneAnswer.milestone_id) == milestone_id - ) - ).all() - - avg, stddev = _get_statistics_by_age(answers, child_ages) - expected_age = _get_expected_age_from_scores(avg) - return MilestoneAgeScoreCollection( - id=None, - milestone_id=milestone_id, - expected_age=expected_age, - created_at=datetime.datetime.now(), - scores=[ - MilestoneAgeScore( - id=None, - collection_id=None, - avg_score=avg[age], - stddev_score=stddev[age], - age_months=age, - expected_score=4 - if age >= expected_age - else 1, # TODO: placeholder algorithm? how does the model behind this look like really? - ) - for age in range(0, len(avg)) - ], - ) - - -def calculate_milestonegroup_statistics_by_age( - session: SessionDep, - milestonegroup_id, - answers: Sequence[MilestoneAnswer] | None = None, -) -> MilestoneGroupAgeScoreCollection: - """ - _summary_ - - Parameters - ---------- - session : SessionDep - _description_ - milestonegroup_id : _type_ - _description_ - answers : Sequence[MilestoneAnswer] | None, optional - _description_, by default None - - Returns - ------- - MilestoneGroupAgeScoreCollection - _description_ - """ - # FIXME: change this to an online algorithm that starts with the last known statistcs - # and then adds the new answers - child_ages = _get_answer_session_child_ages_in_months(session) - - if answers is None: - answers = session.exec( - select(MilestoneAnswer).where( - col(MilestoneAnswer.milestone_group_id) == milestonegroup_id - ) - ).all() - - avg, stddev = _get_statistics_by_age(answers, child_ages) - return MilestoneGroupAgeScoreCollection( - id=None, - milestonegroup_id=milestonegroup_id, - scores=[ - MilestoneGroupAgeScore( - id=None, - collection_id=None, - age_months=age, - avg_score=avg[age], - stddev_score=stddev[age], - milestonegroup_id=milestonegroup_id, - ) - for age in range(0, len(avg)) - ], - created_at=datetime.datetime.now(), - ) - - -def recompute_milestonegroup_statistics(session: SessionDep): - # fetch all milestonegroup statsitcs and check how old they are. Then - # recompute the ones that are older than timedelta and put back into database - # do the same for milestone statistics - - milestonegroups = session.exec(select(MilestoneGroup.id)).all() - for milestonegroup in milestonegroups: - statistics = calculate_milestonegroup_statistics_by_age(session, milestonegroup) - for score in statistics.scores: - add(session, score) - add(session, statistics) - - -def recompute_milestone_statistics(session: SessionDep): - # fetch all milestonegroup statsitcs and check how old they are. Then - # recompute the ones that are older than timedelta and put back into database - # do the same for milestone statistics - - milestones = session.exec(select(Milestone.id)).all() - for milestone in milestones: - statistics = calculate_milestone_statistics_by_age(session, milestone) # type: ignore - for score in statistics.scores: - add(session, score) - add(session, statistics) - - def child_image_path(child_id: int | None) -> pathlib.Path: return pathlib.Path(f"{app_settings.PRIVATE_FILES_PATH}/children/{child_id}.webp") diff --git a/mondey_backend/tests/utils/test_statistics.py b/mondey_backend/tests/utils/test_statistics.py new file mode 100644 index 00000000..88732832 --- /dev/null +++ b/mondey_backend/tests/utils/test_statistics.py @@ -0,0 +1,182 @@ +from math import isclose + +import numpy as np +from sqlmodel import select + +from mondey_backend.models.milestones import MilestoneAnswer +from mondey_backend.models.milestones import MilestoneGroup +from mondey_backend.routers.statistics import _add_sample +from mondey_backend.routers.statistics import _finalize_statistics +from mondey_backend.routers.statistics import _get_statistics_by_age +from mondey_backend.routers.statistics import calculate_milestone_statistics_by_age +from mondey_backend.routers.statistics import calculate_milestonegroup_statistics_by_age + + +def test_online_statistics_computation(): + data = np.random.normal(0, 1, 200) + data_first = data[0:100] + data_second = data[100:200] + + count = 0 + avg = 0 + var = 0 + + for v in data_first: + count, avg, var = _add_sample(count, avg, var, v) + + count, avg, std = _finalize_statistics(count, avg, var) + + assert count == len(data_first) + assert np.isclose(avg, np.mean(data_first)) + assert np.isclose(std, np.std(data_first, ddof=1)) + + for v in data_second: + count, avg, var = _add_sample(count, avg, var, v) + + count, avg, std = _finalize_statistics(count, avg, var) + + assert count == len(data) + assert np.isclose(avg, np.mean(data)) + assert np.isclose(std, np.std(data, ddof=1)) + + +def test_online_statistics_computation_too_little_data(): + data = [2.42342] + count = 0 + avg = 0 + var = 0 + for v in data: + count, avg, var = _add_sample(count, avg, var, v) + count, avg, std = _finalize_statistics(count, avg, var) + + assert count == 1 + assert avg == 2.42342 + assert std == 0 + + data = [] + count = 0 + avg = 0 + var = 0 + for v in data: + count, avg, var = _add_sample(count, avg, var, v) + count, avg, std = _finalize_statistics(count, avg, var) + + assert count == 0 + assert avg == 0 + assert std == 0 + + +def test_get_score_statistics_by_age(session): + answers = session.exec(select(MilestoneAnswer)).all() + child_ages = {1: 5, 2: 3, 3: 8} + + avg, stddev = _get_statistics_by_age(answers, child_ages) + + assert isclose(avg[5], 1.5) + assert isclose(avg[3], 3.5) + assert isclose(avg[8], 3.0) + + assert np.isclose( + stddev[5], + np.std( + [answer.answer + 1 for answer in answers if answer.answer_session_id == 1], + ddof=1, + ), + ) + + assert np.isclose( + stddev[3], + np.std( + [answer.answer + 1 for answer in answers if answer.answer_session_id == 2], + ddof=1, + ), + ) + + assert np.isclose( + stddev[8], + np.nan_to_num( + np.std( + [ + answer.answer + 1 + for answer in answers + if answer.answer_session_id == 3 + ], + ddof=1, + ) + ), + ) + + child_ages = {} # no answer sessions ==> empty child ages + avg, stddev = _get_statistics_by_age(answers, child_ages) + assert np.all(np.isclose(avg, 0)) + assert np.all(np.isclose(stddev, 0)) + + child_ages = {1: 5, 2: 3, 3: 8} + answers = [] # no answers ==> empty answers + avg, stddev = _get_statistics_by_age(answers, child_ages) + assert np.all(np.isclose(avg, 0)) + assert np.all(np.isclose(stddev, 0)) + + +def test_calculate_milestone_statistics_by_age(session): + # calculate_milestone_statistics_by_age + mscore = calculate_milestone_statistics_by_age(session, 1) + + # only some are filled: milestone 1 is part of answersession 1 (age 8) and 2 (age 9) with + # answers 1, 3 => 2, 4 hence std = 0 and avg = answers + 1 + assert mscore.milestone_id == 1 + assert np.isclose(mscore.scores[8].avg_score, 2.0) + assert np.isclose(mscore.scores[8].stddev_score, 0.0) + assert np.isclose(mscore.scores[9].avg_score, 4.0) + assert np.isclose(mscore.scores[9].stddev_score, 0.0) + assert np.isclose(mscore.scores[42].avg_score, 0.0) + assert np.isclose(mscore.scores[42].stddev_score, 0.0) + + for score in mscore.scores: + if score.age_months not in [8, 9]: + assert np.isclose(score.avg_score, 0.0) + assert np.isclose(score.stddev_score, 0.0) + + if score.age_months > 8: + assert np.isclose(score.expected_score, 4.0) + else: + assert np.isclose(score.expected_score, 1.0) + + +def test_calculate_milestonegroup_statistics(session): + milestone_group = session.exec( + select(MilestoneGroup).where(MilestoneGroup.id == 1) + ).first() + + # milestonegroup 1 has 2 milestones (1, 2 with answers 1, 0 --> 2, 1, + # this belongs wholely to answersession 1 with age 8 + avg_1 = np.mean([1, 2]) + std_1 = np.std([1, 2], ddof=1) + # milestonegroup 2 has 2 milestones (1, 2 with answers 3, 2 --> 4, 3 + # this belongs wholely to answersession 2 with age 9 + avg_2 = np.mean([4, 3]) + std_2 = np.std([4, 3], ddof=1) + + # answersession 3 with age 42 has no answers for milestonegroup 1 + + score = calculate_milestonegroup_statistics_by_age( + session, + milestone_group.id, + ) + + assert score.milestonegroup_id == 1 + assert score.scores[8].avg_score == avg_1 + assert score.scores[8].stddev_score == std_1 + assert score.scores[8].age_months == 8 + assert score.scores[8].milestonegroup_id == 1 + + assert score.scores[9].avg_score == avg_2 + assert score.scores[9].stddev_score == std_2 + assert score.scores[9].age_months == 9 + assert score.scores[9].milestonegroup_id == 1 + + for age in range(0, len(score.scores)): + if age not in [8, 9]: + assert score.scores[age].avg_score == 0 + assert score.scores[age].stddev_score == 0 + assert score.scores[age].milestonegroup_id == 1 diff --git a/mondey_backend/tests/utils/test_utils.py b/mondey_backend/tests/utils/test_utils.py index 095a7df3..2c5ba441 100644 --- a/mondey_backend/tests/utils/test_utils.py +++ b/mondey_backend/tests/utils/test_utils.py @@ -1,14 +1,8 @@ -import numpy as np -from numpy import isclose from sqlmodel import select -from mondey_backend.models.milestones import MilestoneAnswer from mondey_backend.models.milestones import MilestoneAnswerSession from mondey_backend.models.milestones import MilestoneGroup from mondey_backend.routers.utils import _get_answer_session_child_ages_in_months -from mondey_backend.routers.utils import _get_statistics_by_age -from mondey_backend.routers.utils import calculate_milestone_statistics_by_age -from mondey_backend.routers.utils import calculate_milestonegroup_statistics_by_age from mondey_backend.routers.utils import get_milestonegroups_for_answersession @@ -33,119 +27,3 @@ def test_get_answer_session_child_ages_in_months(session): assert child_ages[2] == 9 assert child_ages[1] == 8 assert child_ages[3] == 42 - - -def test_get_score_statistics_by_age(session): - answers = session.exec(select(MilestoneAnswer)).all() - child_ages = {1: 5, 2: 3, 3: 8} - - avg, stddev = _get_statistics_by_age(answers, child_ages) - - assert isclose(avg[5], 1.5) - assert isclose(avg[3], 3.5) - assert isclose(avg[8], 3.0) - - assert np.isclose( - stddev[5], - np.std( - [answer.answer + 1 for answer in answers if answer.answer_session_id == 1], - ddof=1, - ), - ) - - assert np.isclose( - stddev[3], - np.std( - [answer.answer + 1 for answer in answers if answer.answer_session_id == 2], - ddof=1, - ), - ) - - assert np.isclose( - stddev[8], - np.nan_to_num( - np.std( - [ - answer.answer + 1 - for answer in answers - if answer.answer_session_id == 3 - ], - ddof=1, - ) - ), - ) - - child_ages = {} # no answer sessions ==> empty child ages - avg, stddev = _get_statistics_by_age(answers, child_ages) - assert np.all(np.isclose(avg, 0)) - assert np.all(np.isclose(stddev, 0)) - - child_ages = {1: 5, 2: 3, 3: 8} - answers = [] # no answers ==> empty answers - avg, stddev = _get_statistics_by_age(answers, child_ages) - assert np.all(np.isclose(avg, 0)) - assert np.all(np.isclose(stddev, 0)) - - -def test_calculate_milestone_statistics_by_age(session): - # calculate_milestone_statistics_by_age - mscore = calculate_milestone_statistics_by_age(session, 1) - - # only some are filled: milestone 1 is part of answersession 1 (age 8) and 2 (age 9) with - # answers 1, 3 => 2, 4 hence std = 0 and avg = answers + 1 - assert mscore.milestone_id == 1 - assert np.isclose(mscore.scores[8].avg_score, 2.0) - assert np.isclose(mscore.scores[8].stddev_score, 0.0) - assert np.isclose(mscore.scores[9].avg_score, 4.0) - assert np.isclose(mscore.scores[9].stddev_score, 0.0) - assert np.isclose(mscore.scores[42].avg_score, 0.0) - assert np.isclose(mscore.scores[42].stddev_score, 0.0) - - for score in mscore.scores: - if score.age_months not in [8, 9]: - assert np.isclose(score.avg_score, 0.0) - assert np.isclose(score.stddev_score, 0.0) - - if score.age_months > 8: - assert np.isclose(score.expected_score, 4.0) - else: - assert np.isclose(score.expected_score, 1.0) - - -def test_calculate_milestonegroup_statistics(session): - milestone_group = session.exec( - select(MilestoneGroup).where(MilestoneGroup.id == 1) - ).first() - - # milestonegroup 1 has 2 milestones (1, 2 with answers 1, 0 --> 2, 1, - # this belongs wholely to answersession 1 with age 8 - avg_1 = np.mean([1, 2]) - std_1 = np.std([1, 2], ddof=1) - # milestonegroup 2 has 2 milestones (1, 2 with answers 3, 2 --> 4, 3 - # this belongs wholely to answersession 2 with age 9 - avg_2 = np.mean([4, 3]) - std_2 = np.std([4, 3], ddof=1) - - # answersession 3 with age 42 has no answers for milestonegroup 1 - - score = calculate_milestonegroup_statistics_by_age( - session, - milestone_group.id, - ) - - assert score.milestonegroup_id == 1 - assert score.scores[8].avg_score == avg_1 - assert score.scores[8].stddev_score == std_1 - assert score.scores[8].age_months == 8 - assert score.scores[8].milestonegroup_id == 1 - - assert score.scores[9].avg_score == avg_2 - assert score.scores[9].stddev_score == std_2 - assert score.scores[9].age_months == 9 - assert score.scores[9].milestonegroup_id == 1 - - for age in range(0, len(score.scores)): - if age not in [8, 9]: - assert score.scores[age].avg_score == 0 - assert score.scores[age].stddev_score == 0 - assert score.scores[age].milestonegroup_id == 1