From 47ba3bd1134004d0f06becda0aca4bdd054b1258 Mon Sep 17 00:00:00 2001 From: Martin Kess Date: Fri, 3 Jan 2025 20:54:51 -0500 Subject: [PATCH 1/5] Add memory to load calc --- .../livekit/agents/utils/hw/__init__.py | 17 ++++- .../livekit/agents/utils/hw/memory.py | 63 +++++++++++++++++++ livekit-agents/livekit/agents/worker.py | 17 +++-- 3 files changed, 91 insertions(+), 6 deletions(-) create mode 100644 livekit-agents/livekit/agents/utils/hw/memory.py diff --git a/livekit-agents/livekit/agents/utils/hw/__init__.py b/livekit-agents/livekit/agents/utils/hw/__init__.py index 0610f3748..3a1fa968a 100644 --- a/livekit-agents/livekit/agents/utils/hw/__init__.py +++ b/livekit-agents/livekit/agents/utils/hw/__init__.py @@ -1,3 +1,18 @@ from .cpu import CGroupV2CPUMonitor, CPUMonitor, DefaultCPUMonitor, get_cpu_monitor +from .memory import ( + CGroupV2MemoryMonitor, + DefaultMemoryMonitor, + MemoryMonitor, + get_memory_monitor, +) -__all__ = ["get_cpu_monitor", "CPUMonitor", "CGroupV2CPUMonitor", "DefaultCPUMonitor"] +__all__ = [ + "get_cpu_monitor", + "CPUMonitor", + "CGroupV2CPUMonitor", + "DefaultCPUMonitor", + "get_memory_monitor", + "MemoryMonitor", + "CGroupV2MemoryMonitor", + "DefaultMemoryMonitor", +] diff --git a/livekit-agents/livekit/agents/utils/hw/memory.py b/livekit-agents/livekit/agents/utils/hw/memory.py new file mode 100644 index 000000000..4edb9607f --- /dev/null +++ b/livekit-agents/livekit/agents/utils/hw/memory.py @@ -0,0 +1,63 @@ +import os +from abc import ABC, abstractmethod + +import psutil + + +class MemoryMonitor(ABC): + @abstractmethod + def memory_total(self) -> int: + """Total memory available in bytes.""" + pass + + @abstractmethod + def memory_used(self) -> int: + """Memory currently in use in bytes.""" + pass + + @abstractmethod + def memory_percent(self) -> float: + """Memory usage percentage between 0 and 1""" + pass + + +class DefaultMemoryMonitor(MemoryMonitor): + def memory_total(self) -> int: + return psutil.virtual_memory().total + + def memory_used(self) -> int: + return psutil.virtual_memory().used + + def memory_percent(self) -> float: + return psutil.virtual_memory().percent / 100.0 + + +class CGroupV2MemoryMonitor(MemoryMonitor): + def memory_total(self) -> int: + try: + with open("/sys/fs/cgroup/memory.max", "r") as f: + max_memory = f.read().strip() + if max_memory == "max": + return psutil.virtual_memory().total + return int(max_memory) + except FileNotFoundError: + return psutil.virtual_memory().total + + def memory_used(self) -> int: + with open("/sys/fs/cgroup/memory.current", "r") as f: + return int(f.read().strip()) + + def memory_percent(self) -> float: + used = self.memory_used() + total = self.memory_total() + return min(used / total, 1.0) + + +def get_memory_monitor() -> MemoryMonitor: + if _is_cgroup_v2(): + return CGroupV2MemoryMonitor() + return DefaultMemoryMonitor() + + +def _is_cgroup_v2() -> bool: + return os.path.exists("/sys/fs/cgroup/memory.current") diff --git a/livekit-agents/livekit/agents/worker.py b/livekit-agents/livekit/agents/worker.py index 4708a34d3..4ca1807e0 100644 --- a/livekit-agents/livekit/agents/worker.py +++ b/livekit-agents/livekit/agents/worker.py @@ -53,7 +53,7 @@ RunningJobInfo, ) from .log import DEV_LEVEL, logger -from .utils.hw import get_cpu_monitor +from .utils.hw import get_cpu_monitor, get_memory_monitor from .version import __version__ ASSIGNMENT_TIMEOUT = 7.5 @@ -77,8 +77,11 @@ class _DefaultLoadCalc: _instance = None def __init__(self) -> None: - self._m_avg = utils.MovingAverage(5) # avg over 2.5 + # Take average reading over 2.5s of CPU and memory + self._m_avg_cpu = utils.MovingAverage(5) + self._m_avg_mem = utils.MovingAverage(5) self._cpu_monitor = get_cpu_monitor() + self._mem_monitor = get_memory_monitor() self._thread = threading.Thread( target=self._calc_load, daemon=True, name="worker_cpu_load_monitor" ) @@ -88,19 +91,23 @@ def __init__(self) -> None: def _calc_load(self) -> None: while True: cpu_p = self._cpu_monitor.cpu_percent(interval=0.5) + mem_p = self._mem_monitor.memory_percent() with self._lock: - self._m_avg.add_sample(cpu_p) + self._m_avg_cpu.add_sample(cpu_p) + self._m_avg_mem.add_sample(mem_p) def _get_avg(self) -> float: with self._lock: - return self._m_avg.get_avg() + return self._m_avg_cpu.get_avg(), self._m_avg_mem.get_avg() @classmethod def get_load(cls, worker: Worker) -> float: if cls._instance is None: cls._instance = _DefaultLoadCalc() - return cls._instance._m_avg.get_avg() + return max( + cls._instance._m_avg_cpu.get_avg(), cls._instance._m_avg_mem.get_avg() + ) @dataclass From 8d7d906bee0b95a4c47fd462ec262b38287d707c Mon Sep 17 00:00:00 2001 From: Martin Kess Date: Fri, 3 Jan 2025 20:55:34 -0500 Subject: [PATCH 2/5] Add memory to load calc --- livekit-agents/livekit/agents/worker.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/livekit-agents/livekit/agents/worker.py b/livekit-agents/livekit/agents/worker.py index 4ca1807e0..a125cf760 100644 --- a/livekit-agents/livekit/agents/worker.py +++ b/livekit-agents/livekit/agents/worker.py @@ -96,7 +96,7 @@ def _calc_load(self) -> None: self._m_avg_cpu.add_sample(cpu_p) self._m_avg_mem.add_sample(mem_p) - def _get_avg(self) -> float: + def _get_avg(self) -> tuple[float, float]: with self._lock: return self._m_avg_cpu.get_avg(), self._m_avg_mem.get_avg() From df128161725dd15eaa8ac2e0629918679188110a Mon Sep 17 00:00:00 2001 From: martin-purplefish Date: Fri, 3 Jan 2025 20:56:25 -0500 Subject: [PATCH 3/5] Create twelve-hotels-march.md --- .changeset/twelve-hotels-march.md | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 .changeset/twelve-hotels-march.md diff --git a/.changeset/twelve-hotels-march.md b/.changeset/twelve-hotels-march.md new file mode 100644 index 000000000..c7961371b --- /dev/null +++ b/.changeset/twelve-hotels-march.md @@ -0,0 +1,5 @@ +--- +"livekit-agents": patch +--- + +Add memory to load calc From dd19c6cc8c2da7b634ba17f248f8f235389e097d Mon Sep 17 00:00:00 2001 From: martin-purplefish Date: Fri, 3 Jan 2025 20:58:06 -0500 Subject: [PATCH 4/5] Update twelve-hotels-march.md --- .changeset/twelve-hotels-march.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.changeset/twelve-hotels-march.md b/.changeset/twelve-hotels-march.md index c7961371b..bec7983ed 100644 --- a/.changeset/twelve-hotels-march.md +++ b/.changeset/twelve-hotels-march.md @@ -2,4 +2,4 @@ "livekit-agents": patch --- -Add memory to load calc +Add memory to load calc. The default load calc now returns the maximum of the memory usage and CPU usage. From c29ed220fd6b07b79201ca3db2cb0662140eebbf Mon Sep 17 00:00:00 2001 From: Martin Kess Date: Sun, 5 Jan 2025 10:55:28 -0500 Subject: [PATCH 5/5] use avg function --- livekit-agents/livekit/agents/worker.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/livekit-agents/livekit/agents/worker.py b/livekit-agents/livekit/agents/worker.py index a125cf760..85f4396c7 100644 --- a/livekit-agents/livekit/agents/worker.py +++ b/livekit-agents/livekit/agents/worker.py @@ -96,7 +96,7 @@ def _calc_load(self) -> None: self._m_avg_cpu.add_sample(cpu_p) self._m_avg_mem.add_sample(mem_p) - def _get_avg(self) -> tuple[float, float]: + def _get_avgs(self) -> tuple[float, float]: with self._lock: return self._m_avg_cpu.get_avg(), self._m_avg_mem.get_avg() @@ -105,9 +105,7 @@ def get_load(cls, worker: Worker) -> float: if cls._instance is None: cls._instance = _DefaultLoadCalc() - return max( - cls._instance._m_avg_cpu.get_avg(), cls._instance._m_avg_mem.get_avg() - ) + return max(*cls._instance._get_avgs()) @dataclass