Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ensure that each test closes all file descriptors #1768

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions compute_endpoint/globus_compute_endpoint/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -1005,7 +1005,9 @@ def disable_on_boot_cmd(ep_dir: pathlib.Path):
def cli_run():
"""Entry point for setuptools to point to"""
app()
setup_logging() # reset


if __name__ == "__main__":
app()
setup_logging() # reset
48 changes: 48 additions & 0 deletions compute_endpoint/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,16 @@
import random
import signal
import string
import subprocess
import threading
import time
import typing as t
import uuid
from queue import Queue

import globus_compute_sdk as gc
import globus_sdk
import psutil
import pytest
import responses
from globus_compute_endpoint import engines
Expand Down Expand Up @@ -234,3 +237,48 @@ def _warned(msg: str) -> bool:
return test

assert any(_warned(str(w)) for w in pyt_w.list)


@pytest.fixture(autouse=True)
def resource_watcher():
p = psutil.Process()
ls_args = ("/bin/ls", "-lv", "--full-time", "--color=always", f"/proc/{p.pid}/fd/")
vm_beg = psutil.virtual_memory()
with p.oneshot():
mem_beg = p.memory_info()
fds_beg = p.num_fds()
thread_beg = p.num_threads()
ctx_beg = p.num_ctx_switches()
io_beg = getattr(p, "io_counters", lambda: "(not supported on this system)")()
ls_fds = subprocess.run(ls_args, capture_output=True)
os_fds_view_beg = ls_fds.stdout.decode()

yield

vm_end = psutil.virtual_memory()
with p.oneshot():
mem_end = p.memory_info()
fds_end = p.num_fds()
thread_end = p.num_threads()
ctx_end = p.num_ctx_switches()
io_end = getattr(p, "io_counters", lambda: "(not supported on this system)")()
ls_fds = subprocess.run(ls_args, capture_output=True)
os_fds_view_end = ls_fds.stdout.decode()

if fds_end > fds_beg:
thread_list = "\n ".join(
f"{i:>3}: {repr(t)}" for i, t in enumerate(threading.enumerate(), start=1)
)
msg = (
f"\nSystem Virtual Memory:\n {vm_beg=}\n {vm_end=}"
f"\n\nProcess Memory:\n {mem_beg=}\n {mem_end=}"
f"\n\nThread count:\n {thread_beg=}\n {thread_end=}"
f"\n\nContext Switches:\n {ctx_beg=}\n {ctx_end=}"
f"\n\nI/O Counters:\n {io_beg=}\n {io_end=}"
f"\n\nFile Descriptors:\n {fds_beg=}\n {fds_end=}"
f"\n\nThreads (count: {p.num_threads()}):\n {thread_list}"
f"\n\nOpen files (Before): {os_fds_view_beg}"
f"\nOpen files (After): {os_fds_view_end}"
)
msg = msg.replace("\n", "\n | ")
assert fds_end <= fds_beg, f"Left over file descriptors!!\n{msg}"
3 changes: 3 additions & 0 deletions compute_endpoint/tests/unit/test_cli_behavior.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
from globus_compute_endpoint.endpoint.config.utils import load_config_yaml
from globus_compute_endpoint.endpoint.endpoint import Endpoint
from globus_compute_endpoint.engines import ThreadPoolEngine
from globus_compute_endpoint.logging_config import setup_logging
from globus_compute_sdk.sdk.auth.auth_client import ComputeAuthClient
from globus_compute_sdk.sdk.auth.globus_app import UserApp
from globus_compute_sdk.sdk.compute_dir import ensure_compute_dir
Expand Down Expand Up @@ -138,6 +139,7 @@ def func(argline, *, assert_exit_code: int | None = 0, stdin=None):
if stdin is None:
stdin = "{}" # silence some logs; incurred by invoke's sys.stdin choice
result = cli_runner.invoke(app, args, input=stdin)
setup_logging()
if assert_exit_code is not None:
assert result.exit_code == assert_exit_code, (result.stdout, result.stderr)
return result
Expand Down Expand Up @@ -202,6 +204,7 @@ def test_get_globus_app_with_scopes(mocker: MockFixture):
for scope_list in app.scope_requirements.values():
for scope in scope_list:
scopes.append(str(scope))
app.config.token_storage.close() # is a SQLiteStorage object

assert len(scopes) > 0
assert all(str(s) in scopes for s in WebClient.default_scope_requirements)
Expand Down
4 changes: 3 additions & 1 deletion compute_sdk/globus_compute_sdk/sdk/auth/globus_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@
def get_globus_app(environment: str | None = None) -> GlobusApp:
app_name = platform.node()
client_id, client_secret = get_client_creds()
config = GlobusAppConfig(token_storage=get_token_storage(environment=environment))
token_storage = get_token_storage(environment=environment)
config = GlobusAppConfig(token_storage=token_storage)

if client_id and client_secret:
return ClientApp(
Expand All @@ -24,6 +25,7 @@ def get_globus_app(environment: str | None = None) -> GlobusApp:
)

elif client_secret:
token_storage.close()
raise ValueError(
"Both GLOBUS_COMPUTE_CLIENT_ID and GLOBUS_COMPUTE_CLIENT_SECRET must "
"be set to use a client identity. Either set both environment "
Expand Down
Loading