From 8be81b8446bb179f379e3f98ec5572bbd439e36a Mon Sep 17 00:00:00 2001 From: Reid Mello <30907815+rjmello@users.noreply.github.com> Date: Tue, 24 Oct 2023 23:33:01 -0400 Subject: [PATCH] Populate `GlobusComputeEngine` status report --- ...05335_30907815+rjmello_gcengine_status.rst | 6 + .../engines/globus_compute.py | 168 ++++++++++++++---- .../tests/unit/test_status_reporting.py | 55 ++++++ 3 files changed, 190 insertions(+), 39 deletions(-) create mode 100644 changelog.d/20231025_205335_30907815+rjmello_gcengine_status.rst diff --git a/changelog.d/20231025_205335_30907815+rjmello_gcengine_status.rst b/changelog.d/20231025_205335_30907815+rjmello_gcengine_status.rst new file mode 100644 index 000000000..a7c799867 --- /dev/null +++ b/changelog.d/20231025_205335_30907815+rjmello_gcengine_status.rst @@ -0,0 +1,6 @@ +New Functionality +^^^^^^^^^^^^^^^^^ + +- Added support for endpoint status reports when using ``GlobusComputeEngine``. + The report includes information such as the total number of active workers, + idle workers, and pending tasks. \ No newline at end of file diff --git a/compute_endpoint/globus_compute_endpoint/engines/globus_compute.py b/compute_endpoint/globus_compute_endpoint/engines/globus_compute.py index 79910482e..3c733bae7 100644 --- a/compute_endpoint/globus_compute_endpoint/engines/globus_compute.py +++ b/compute_endpoint/globus_compute_endpoint/engines/globus_compute.py @@ -87,32 +87,87 @@ def _submit( def provider(self): return self.executor.provider - def get_outstanding_breakdown(self) -> t.List[t.Tuple[str, int, bool]]: + def get_connected_managers(self) -> t.List[t.Dict[str, t.Any]]: """ + Returns + ------- + List of dicts containing info for all connected managers + """ + return self.executor.connected_managers() + + def get_total_managers(self, managers: t.List[t.Dict[str, t.Any]]) -> int: + """ + Parameters + ---------- + managers: list[dict[str, Any]] + List of dicts containing info for all connected managers + + Returns + ------- + Total number of managers + """ + return len(managers) + + def get_total_active_managers(self, managers: t.List[t.Dict[str, t.Any]]) -> int: + """ + Parameters + ---------- + managers: list[dict[str, Any]] + List of dicts containing info for all connected managers + + Returns + ------- + Number of managers that have capacity for new tasks + """ + count = 0 + for mgr in managers: + if mgr["active"]: + count += 1 + return count + + def get_outstanding_breakdown( + self, managers: t.Optional[t.List[t.Dict[str, t.Any]]] = None + ) -> t.List[t.Tuple[str, int, bool]]: + """ + Parameters + ---------- + managers: list[dict[str, Any]] | None + List of dicts containing info for all connected managers Returns ------- List of tuples of the form (component, # of tasks on component, active?) """ + if managers is None: + managers = self.get_connected_managers() total_task_count = self.executor.outstanding - manager_info: t.List[t.Dict[str, t.Any]] = self.executor.connected_managers() - breakdown = [(m["manager"], m["tasks"], m["active"]) for m in manager_info] - total_count_managers = sum([m["tasks"] for m in manager_info]) + breakdown = [(m["manager"], m["tasks"], m["active"]) for m in managers] + total_count_managers = sum([m["tasks"] for m in managers]) task_count_interchange = total_task_count - total_count_managers breakdown = [("interchange", task_count_interchange, True)] + breakdown return breakdown - def get_total_tasks_outstanding(self): + def get_total_tasks_outstanding(self) -> dict: """ - Returns ------- - Returns a dict of type {str_task_type: count_tasks} + Dict of type {str_task_type: count_tasks} + """ + return {"RAW": self.executor.outstanding} + + def get_total_tasks_pending(self, managers: t.List[t.Dict[str, t.Any]]) -> int: + """ + Parameters + ---------- + managers: list[dict[str, Any]] + List of dicts containing info for all connected managers + Returns + ------- + Total number of pending tasks """ - outstanding = self.get_outstanding_breakdown() - total = sum([component[1] for component in outstanding]) - return {"RAW": total} + outstanding = self.get_outstanding_breakdown(managers=managers) + return outstanding[0][1] # Queued in interchange def provider_status(self): status = [] @@ -122,10 +177,40 @@ def provider_status(self): status = self.provider.status(job_ids=job_ids) return status - def get_total_live_workers(self) -> int: - manager_info: t.List[dict[str, t.Any]] = self.executor.connected_managers() - worker_count = sum([mgr["worker_count"] for mgr in manager_info]) - return worker_count + def get_total_live_workers( + self, managers: t.Optional[t.List[t.Dict[str, t.Any]]] = None + ) -> int: + """ + Parameters + ---------- + managers: list[dict[str, Any]] + List of dicts containing info for all connected managers + + Returns + ------- + Total number of live workers + """ + if managers is None: + managers = self.get_connected_managers() + return sum([mgr["worker_count"] for mgr in managers]) + + def get_total_idle_workers(self, managers: t.List[t.Dict[str, t.Any]]) -> int: + """ + Parameters + ---------- + managers: list[dict[str, Any]] + List of dicts containing info for all connected managers + + Returns + ------- + Total number of workers that are not actively running tasks + """ + idle_workers = 0 + for mgr in managers: + workers = mgr["worker_count"] + tasks = mgr["tasks"] + idle_workers += max(0, workers - tasks) + return idle_workers def scale_out(self, blocks: int): logger.info(f"Scaling out {blocks} blocks") @@ -136,41 +221,46 @@ def scale_in(self, blocks: int): to_kill = list(self.executor.blocks.values())[:blocks] return self.provider.cancel(to_kill) + @property + def scaling_enabled(self) -> bool: + """Indicates whether scaling is possible""" + max_blocks = self.executor.provider.max_blocks + return max_blocks > 0 + def get_status_report(self) -> EPStatusReport: """ - endpoint_id: uuid.UUID - ep_status_report: t.Dict[str, t.Any] - task_statuses: t.Dict[str, t.List[TaskTransition]] Returns ------- + Object containing info on the current status of the endpoint """ + managers = self.get_connected_managers() executor_status: t.Dict[str, t.Any] = { - "task_id": -2, + "task_id": -2, # Deprecated "info": { - "total_cores": 0, - "total_mem": 0, - "new_core_hrs": 0, - "total_core_hrs": 0, - "managers": 0, - "active_managers": 0, - "total_workers": 0, - "idle_workers": 0, - "pending_tasks": 0, - "outstanding_tasks": 0, - "worker_mode": 0, - "scheduler_mode": 0, - "scaling_enabled": False, - "mem_per_worker": 0, - "cores_per_worker": 0, - "prefetch_capacity": 0, - "max_blocks": 1, - "min_blocks": 1, - "max_workers_per_node": 0, - "nodes_per_block": 1, + "total_cores": 0, # TODO + "total_mem": 0, # TODO + "new_core_hrs": 0, # TODO + "total_core_hrs": 0, # TODO + "managers": self.get_total_managers(managers=managers), + "active_managers": self.get_total_active_managers(managers=managers), + "total_workers": self.get_total_live_workers(managers=managers), + "idle_workers": self.get_total_idle_workers(managers=managers), + "pending_tasks": self.get_total_tasks_pending(managers=managers), + "outstanding_tasks": self.get_total_tasks_outstanding()["RAW"], + "worker_mode": 0, # Deprecated + "scheduler_mode": 0, # Deprecated + "scaling_enabled": self.scaling_enabled, + "mem_per_worker": self.executor.mem_per_worker, + "cores_per_worker": self.executor.cores_per_worker, + "prefetch_capacity": self.executor.prefetch_capacity, + "max_blocks": self.executor.provider.max_blocks, + "min_blocks": self.executor.provider.min_blocks, + "max_workers_per_node": self.executor.max_workers, + "nodes_per_block": self.executor.provider.nodes_per_block, "heartbeat_period": self._heartbeat_period, }, } - task_status_deltas: t.Dict[str, t.List[TaskTransition]] = {} + task_status_deltas: t.Dict[str, t.List[TaskTransition]] = {} # TODO return EPStatusReport( endpoint_id=self.endpoint_id, global_state=executor_status, diff --git a/compute_endpoint/tests/unit/test_status_reporting.py b/compute_endpoint/tests/unit/test_status_reporting.py index 88c7edb6f..2245c24d9 100644 --- a/compute_endpoint/tests/unit/test_status_reporting.py +++ b/compute_endpoint/tests/unit/test_status_reporting.py @@ -1,9 +1,11 @@ import logging +import parsl import pytest from globus_compute_common import messagepack from globus_compute_common.messagepack.message_types import EPStatusReport from globus_compute_endpoint import engines +from pytest_mock import MockFixture logger = logging.getLogger(__name__) @@ -34,3 +36,56 @@ def test_status_reporting(engine_type, engine_runner, engine_heartbeat: int): message = q_msg["message"] report = messagepack.unpack(message) assert isinstance(report, EPStatusReport) + + +def test_gcengine_status_report(mocker: MockFixture, engine_runner: callable): + managers = [ + { + "manager": "f6826ed3c7cd", + "block_id": "0", + "worker_count": 12, + "tasks": 5, + "idle_duration": 0.0, + "active": True, + }, + { + "manager": "e6428de3c7ce", + "block_id": "1", + "worker_count": 12, + "tasks": 15, + "idle_duration": 0.0, + "active": False, + }, + { + "manager": "d63f7ee5c78d", + "block_id": "2", + "worker_count": 12, + "tasks": 0, + "idle_duration": 3.8, + "active": True, + }, + ] + mocker.patch.object( + parsl.HighThroughputExecutor, "connected_managers", lambda x: managers + ) + mocker.patch.object(parsl.HighThroughputExecutor, "outstanding", 25) + + engine: engines.GlobusComputeEngine = engine_runner(engines.GlobusComputeEngine) + status_report = engine.get_status_report() + info = status_report.global_state["info"] + + assert info["managers"] == 3 + assert info["active_managers"] == 2 + assert info["total_workers"] == 36 + assert info["idle_workers"] == 19 + assert info["pending_tasks"] == 5 + assert info["outstanding_tasks"] == 25 + assert info["scaling_enabled"] == engine.executor.provider.max_blocks > 0 + assert info["mem_per_worker"] == engine.executor.mem_per_worker + assert info["cores_per_worker"] == engine.executor.cores_per_worker + assert info["prefetch_capacity"] == engine.executor.prefetch_capacity + assert info["max_blocks"] == engine.executor.provider.max_blocks + assert info["min_blocks"] == engine.executor.provider.min_blocks + assert info["max_workers_per_node"] == engine.executor.max_workers + assert info["nodes_per_block"] == engine.executor.provider.nodes_per_block + assert info["heartbeat_period"] == engine._heartbeat_period