Skip to content

Commit

Permalink
Populate GlobusComputeEngine status report
Browse files Browse the repository at this point in the history
  • Loading branch information
rjmello committed Oct 26, 2023
1 parent 510d62a commit 8be81b8
Show file tree
Hide file tree
Showing 3 changed files with 190 additions and 39 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
New Functionality
^^^^^^^^^^^^^^^^^

- Added support for endpoint status reports when using ``GlobusComputeEngine``.
The report includes information such as the total number of active workers,
idle workers, and pending tasks.
168 changes: 129 additions & 39 deletions compute_endpoint/globus_compute_endpoint/engines/globus_compute.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,32 +87,87 @@ def _submit(
def provider(self):
return self.executor.provider

def get_outstanding_breakdown(self) -> t.List[t.Tuple[str, int, bool]]:
def get_connected_managers(self) -> t.List[t.Dict[str, t.Any]]:
"""
Returns
-------
List of dicts containing info for all connected managers
"""
return self.executor.connected_managers()

def get_total_managers(self, managers: t.List[t.Dict[str, t.Any]]) -> int:
"""
Parameters
----------
managers: list[dict[str, Any]]
List of dicts containing info for all connected managers
Returns
-------
Total number of managers
"""
return len(managers)

def get_total_active_managers(self, managers: t.List[t.Dict[str, t.Any]]) -> int:
"""
Parameters
----------
managers: list[dict[str, Any]]
List of dicts containing info for all connected managers
Returns
-------
Number of managers that have capacity for new tasks
"""
count = 0
for mgr in managers:
if mgr["active"]:
count += 1
return count

def get_outstanding_breakdown(
self, managers: t.Optional[t.List[t.Dict[str, t.Any]]] = None
) -> t.List[t.Tuple[str, int, bool]]:
"""
Parameters
----------
managers: list[dict[str, Any]] | None
List of dicts containing info for all connected managers
Returns
-------
List of tuples of the form (component, # of tasks on component, active?)
"""
if managers is None:
managers = self.get_connected_managers()
total_task_count = self.executor.outstanding
manager_info: t.List[t.Dict[str, t.Any]] = self.executor.connected_managers()
breakdown = [(m["manager"], m["tasks"], m["active"]) for m in manager_info]
total_count_managers = sum([m["tasks"] for m in manager_info])
breakdown = [(m["manager"], m["tasks"], m["active"]) for m in managers]
total_count_managers = sum([m["tasks"] for m in managers])
task_count_interchange = total_task_count - total_count_managers
breakdown = [("interchange", task_count_interchange, True)] + breakdown
return breakdown

def get_total_tasks_outstanding(self):
def get_total_tasks_outstanding(self) -> dict:
"""
Returns
-------
Returns a dict of type {str_task_type: count_tasks}
Dict of type {str_task_type: count_tasks}
"""
return {"RAW": self.executor.outstanding}

def get_total_tasks_pending(self, managers: t.List[t.Dict[str, t.Any]]) -> int:
"""
Parameters
----------
managers: list[dict[str, Any]]
List of dicts containing info for all connected managers
Returns
-------
Total number of pending tasks
"""
outstanding = self.get_outstanding_breakdown()
total = sum([component[1] for component in outstanding])
return {"RAW": total}
outstanding = self.get_outstanding_breakdown(managers=managers)
return outstanding[0][1] # Queued in interchange

def provider_status(self):
status = []
Expand All @@ -122,10 +177,40 @@ def provider_status(self):
status = self.provider.status(job_ids=job_ids)
return status

def get_total_live_workers(self) -> int:
manager_info: t.List[dict[str, t.Any]] = self.executor.connected_managers()
worker_count = sum([mgr["worker_count"] for mgr in manager_info])
return worker_count
def get_total_live_workers(
self, managers: t.Optional[t.List[t.Dict[str, t.Any]]] = None
) -> int:
"""
Parameters
----------
managers: list[dict[str, Any]]
List of dicts containing info for all connected managers
Returns
-------
Total number of live workers
"""
if managers is None:
managers = self.get_connected_managers()
return sum([mgr["worker_count"] for mgr in managers])

def get_total_idle_workers(self, managers: t.List[t.Dict[str, t.Any]]) -> int:
"""
Parameters
----------
managers: list[dict[str, Any]]
List of dicts containing info for all connected managers
Returns
-------
Total number of workers that are not actively running tasks
"""
idle_workers = 0
for mgr in managers:
workers = mgr["worker_count"]
tasks = mgr["tasks"]
idle_workers += max(0, workers - tasks)
return idle_workers

def scale_out(self, blocks: int):
logger.info(f"Scaling out {blocks} blocks")
Expand All @@ -136,41 +221,46 @@ def scale_in(self, blocks: int):
to_kill = list(self.executor.blocks.values())[:blocks]
return self.provider.cancel(to_kill)

@property
def scaling_enabled(self) -> bool:
"""Indicates whether scaling is possible"""
max_blocks = self.executor.provider.max_blocks
return max_blocks > 0

def get_status_report(self) -> EPStatusReport:
"""
endpoint_id: uuid.UUID
ep_status_report: t.Dict[str, t.Any]
task_statuses: t.Dict[str, t.List[TaskTransition]]
Returns
-------
Object containing info on the current status of the endpoint
"""
managers = self.get_connected_managers()
executor_status: t.Dict[str, t.Any] = {
"task_id": -2,
"task_id": -2, # Deprecated
"info": {
"total_cores": 0,
"total_mem": 0,
"new_core_hrs": 0,
"total_core_hrs": 0,
"managers": 0,
"active_managers": 0,
"total_workers": 0,
"idle_workers": 0,
"pending_tasks": 0,
"outstanding_tasks": 0,
"worker_mode": 0,
"scheduler_mode": 0,
"scaling_enabled": False,
"mem_per_worker": 0,
"cores_per_worker": 0,
"prefetch_capacity": 0,
"max_blocks": 1,
"min_blocks": 1,
"max_workers_per_node": 0,
"nodes_per_block": 1,
"total_cores": 0, # TODO
"total_mem": 0, # TODO
"new_core_hrs": 0, # TODO
"total_core_hrs": 0, # TODO
"managers": self.get_total_managers(managers=managers),
"active_managers": self.get_total_active_managers(managers=managers),
"total_workers": self.get_total_live_workers(managers=managers),
"idle_workers": self.get_total_idle_workers(managers=managers),
"pending_tasks": self.get_total_tasks_pending(managers=managers),
"outstanding_tasks": self.get_total_tasks_outstanding()["RAW"],
"worker_mode": 0, # Deprecated
"scheduler_mode": 0, # Deprecated
"scaling_enabled": self.scaling_enabled,
"mem_per_worker": self.executor.mem_per_worker,
"cores_per_worker": self.executor.cores_per_worker,
"prefetch_capacity": self.executor.prefetch_capacity,
"max_blocks": self.executor.provider.max_blocks,
"min_blocks": self.executor.provider.min_blocks,
"max_workers_per_node": self.executor.max_workers,
"nodes_per_block": self.executor.provider.nodes_per_block,
"heartbeat_period": self._heartbeat_period,
},
}
task_status_deltas: t.Dict[str, t.List[TaskTransition]] = {}
task_status_deltas: t.Dict[str, t.List[TaskTransition]] = {} # TODO
return EPStatusReport(
endpoint_id=self.endpoint_id,
global_state=executor_status,
Expand Down
55 changes: 55 additions & 0 deletions compute_endpoint/tests/unit/test_status_reporting.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
import logging

import parsl
import pytest
from globus_compute_common import messagepack
from globus_compute_common.messagepack.message_types import EPStatusReport
from globus_compute_endpoint import engines
from pytest_mock import MockFixture

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -34,3 +36,56 @@ def test_status_reporting(engine_type, engine_runner, engine_heartbeat: int):
message = q_msg["message"]
report = messagepack.unpack(message)
assert isinstance(report, EPStatusReport)


def test_gcengine_status_report(mocker: MockFixture, engine_runner: callable):
managers = [
{
"manager": "f6826ed3c7cd",
"block_id": "0",
"worker_count": 12,
"tasks": 5,
"idle_duration": 0.0,
"active": True,
},
{
"manager": "e6428de3c7ce",
"block_id": "1",
"worker_count": 12,
"tasks": 15,
"idle_duration": 0.0,
"active": False,
},
{
"manager": "d63f7ee5c78d",
"block_id": "2",
"worker_count": 12,
"tasks": 0,
"idle_duration": 3.8,
"active": True,
},
]
mocker.patch.object(
parsl.HighThroughputExecutor, "connected_managers", lambda x: managers
)
mocker.patch.object(parsl.HighThroughputExecutor, "outstanding", 25)

engine: engines.GlobusComputeEngine = engine_runner(engines.GlobusComputeEngine)
status_report = engine.get_status_report()
info = status_report.global_state["info"]

assert info["managers"] == 3
assert info["active_managers"] == 2
assert info["total_workers"] == 36
assert info["idle_workers"] == 19
assert info["pending_tasks"] == 5
assert info["outstanding_tasks"] == 25
assert info["scaling_enabled"] == engine.executor.provider.max_blocks > 0
assert info["mem_per_worker"] == engine.executor.mem_per_worker
assert info["cores_per_worker"] == engine.executor.cores_per_worker
assert info["prefetch_capacity"] == engine.executor.prefetch_capacity
assert info["max_blocks"] == engine.executor.provider.max_blocks
assert info["min_blocks"] == engine.executor.provider.min_blocks
assert info["max_workers_per_node"] == engine.executor.max_workers
assert info["nodes_per_block"] == engine.executor.provider.nodes_per_block
assert info["heartbeat_period"] == engine._heartbeat_period

0 comments on commit 8be81b8

Please sign in to comment.