Skip to content

Commit

Permalink
Merge pull request #6562 from grondo/relative-uris
Browse files Browse the repository at this point in the history
python: support relative and absolute path-like targets in jobid URI resolver
  • Loading branch information
mergify[bot] authored Jan 24, 2025
2 parents 01cad95 + f1c3ded commit 921ed27
Show file tree
Hide file tree
Showing 5 changed files with 129 additions and 40 deletions.
24 changes: 19 additions & 5 deletions doc/man1/flux-uri.rst
Original file line number Diff line number Diff line change
Expand Up @@ -89,14 +89,21 @@ URI SCHEMES

The following URI schemes are included by default:

jobid:ID[/ID...]
jobid:PATH
This scheme attempts to get the URI for a Flux instance running as a
job in the current enclosing instance. This is the assumed scheme if no
``scheme:`` is provided in *TARGET* passed to :program:`flux uri`, so the
``jobid:`` prefix is optional. A hierarchy of Flux jobids is supported,
so ``f1234/f3456`` will resolve the URI for job ``f3456`` running in
job ``f1234`` in the current instance. This scheme will raise an error
if the target job is not running.
``jobid:`` prefix is optional. *PATH* is a hierarchical path expression
that may contain an optional leading slash (``/``) (which references
the top-level, root instance explicitly), followed by zero or more job
IDs separated by slashes. The special IDs ``.`` and ``..`` indicate
the current instance (within the hierarchy) and its parent, respectively.
This allows resolution of a single job running in the current instance
via ``f1234``, explicitly within the root instance via ``/f2345``, or
a job running within another job via ``f3456/f789``. Completely relative
paths can also be used such as ``..`` to get the URI of the current
parent, or ``../..`` to get the URI of the parent's parent. Finally,
a single slash (``/``) may be used to get the root instance URI.

The ``jobid`` scheme supports the optional query parameter ``?wait``, which
causes the resolver to wait until a URI has been posted to the job eventlog
Expand Down Expand Up @@ -150,6 +157,13 @@ Get the URI of a nested job:
the last component of the jobid "path" or hierarchy. This will resolve
each URI in turn as a local URI.

Get the URI of the root instance from within a job running at any depth:

::

$ flux uri /
local:///run/flux/local

Get the URI of a local flux-broker

::
Expand Down
100 changes: 68 additions & 32 deletions src/bindings/python/flux/uri/resolvers/jobid.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,13 @@
###############################################################

import os
from pathlib import PurePath
from pathlib import PurePosixPath

import flux
from flux.job import JobID, job_list_id
from flux.uri import JobURI, URIResolverPlugin, URIResolverURI


def filter_slash(iterable):
return list(filter(lambda x: "/" not in x, iterable))


def wait_for_uri(flux_handle, jobid):
"""Wait for memo event containing job uri, O/w finish event"""
for event in flux.job.event_watch(flux_handle, jobid):
Expand All @@ -30,41 +26,78 @@ def wait_for_uri(flux_handle, jobid):
return None


def resolve_parent(handle):
"""Return parent-uri if instance-level > 0, else local-uri"""
if int(handle.attr_get("instance-level")) > 0:
return handle.attr_get("parent-uri")
return handle.attr_get("local-uri")


def resolve_root(flux_handle):
"""Return the URI of the top-level, or root, instance."""
handle = flux_handle
while int(handle.attr_get("instance-level")) > 0:
handle = flux.Flux(resolve_parent(handle))
return handle.attr_get("local-uri")


def resolve_jobid(flux_handle, arg, wait):
try:
jobid = JobID(arg)
except OSError as exc:
raise ValueError(f"{arg} is not a valid jobid")

try:
if wait:
uri = wait_for_uri(flux_handle, jobid)
else:
# Fetch the jobinfo object for this job
job = job_list_id(
flux_handle, jobid, attrs=["state", "annotations"]
).get_jobinfo()
if job.state != "RUN":
raise ValueError(f"jobid {arg} is not running")
uri = job.user.uri
except FileNotFoundError as exc:
raise ValueError(f"jobid {arg} not found") from exc

if uri is None or str(uri) == "":
raise ValueError(f"URI not found for job {arg}")
return uri


class URIResolver(URIResolverPlugin):
"""A URI resolver that attempts to fetch the remote_uri for a job"""

def describe(self):
return "Get URI for a given Flux JOBID"

def _do_resolve(self, uri, flux_handle, force_local=False, wait=False):
def _do_resolve(
self, uri, flux_handle, force_local=False, wait=False, hostname=None
):
#
# Convert a possible hierarchy of jobids to a list, dropping any
# extraneous '/' (e.g. //id0/id1 -> [ "id0", "id1" ]
jobids = filter_slash(PurePath(uri.path).parts)
# Convert a possible hierarchy of jobids to a list
jobids = list(PurePosixPath(uri.path).parts)

# If path is empty, return current enclosing URI
if not jobids:
return flux_handle.attr_get("local-uri")

# Pop the first jobid off the list, this id should be local:
# Pop the first jobid off the list, if a jobid it should be local,
# otherwise "/" for the root URI or ".." for parent URI:
arg = jobids.pop(0)
try:
jobid = JobID(arg)
except OSError as exc:
raise ValueError(f"{arg} is not a valid jobid")

try:
if wait:
uri = wait_for_uri(flux_handle, jobid)
else:
# Fetch the jobinfo object for this job
job = job_list_id(
flux_handle, jobid, attrs=["state", "annotations"]
).get_jobinfo()
if job.state != "RUN":
raise ValueError(f"jobid {arg} is not running")
uri = job.user.uri
except FileNotFoundError as exc:
raise ValueError(f"jobid {arg} not found") from exc

if uri is None or str(uri) == "":
raise ValueError(f"URI not found for job {arg}")
if arg == "/":
uri = resolve_root(flux_handle)
elif arg == "..":
uri = resolve_parent(flux_handle)
# Relative paths always use a local:// uri. But, if a jobid was
# resolved earlier in the path, then use the hostname associated
# with that job.
if hostname:
uri = JobURI(uri, remote_hostname=hostname).remote
else:
uri = resolve_jobid(flux_handle, arg, wait)
hostname = JobURI(uri).netloc

# If there are more jobids in the hierarchy to resolve, resolve
# them recursively
Expand All @@ -74,7 +107,10 @@ def _do_resolve(self, uri, flux_handle, force_local=False, wait=False):
if force_local:
uri = JobURI(uri).local
return self._do_resolve(
resolver_uri, flux.Flux(uri), force_local=force_local
resolver_uri,
flux.Flux(uri),
force_local=force_local,
hostname=hostname,
)
return uri

Expand Down
14 changes: 11 additions & 3 deletions src/bindings/python/flux/uri/uri.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,26 +57,34 @@ class JobURI(URI):
remote: If local URI, returns a remote URI substituting current hostname.
If a remote URI, returns the URI.
local: If a remote URI, convert to a local URI. Otherwise return the URI.
Args:
uri (str): The URI string with which to initialize the JobURI instance.
remote_hostname (str): If ``uri`` is a local URI, use the provided
hostname instead of the current hostname when rendering the remote
URI.
"""

force_local = os.environ.get("FLUX_URI_RESOLVE_LOCAL", False)

def __init__(self, uri):
def __init__(self, uri, remote_hostname=None):
super().__init__(uri)
if self.scheme == "":
raise ValueError(f"JobURI '{uri}' does not have a valid scheme")
self.path = re.sub("/+", "/", self.path)
self.remote_uri = None
self.local_uri = None
self.remote_hostname = remote_hostname

@property
def remote(self):
if not self.remote_uri:
if self.scheme == "ssh":
self.remote_uri = self.uri
elif self.scheme == "local":
hostname = platform.uname()[1]
self.remote_uri = f"ssh://{hostname}{self.path}"
if not self.remote_hostname:
self.remote_hostname = platform.uname()[1]
self.remote_uri = f"ssh://{self.remote_hostname}{self.path}"
else:
raise ValueError(
f"Cannot convert JobURI with scheme {self.scheme} to remote"
Expand Down
14 changes: 14 additions & 0 deletions t/python/t0025-uri.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,20 @@ def test_parse_local(self):
self.assertEqual(uri.fragment, "")
self.assertEqual(uri.params, "")

def test_parse_local_with_remote_hostname(self):
hostname = "fakehost"
uri = JobURI("local:///tmp/foo", remote_hostname=hostname)
self.assertEqual(uri.uri, "local:///tmp/foo")
self.assertEqual(str(uri), "local:///tmp/foo")
self.assertEqual(uri.remote, f"ssh://{hostname}/tmp/foo")
self.assertEqual(uri.local, "local:///tmp/foo")
self.assertEqual(uri.scheme, "local")
self.assertEqual(uri.netloc, "")
self.assertEqual(uri.path, "/tmp/foo")
self.assertEqual(uri.query, "")
self.assertEqual(uri.fragment, "")
self.assertEqual(uri.params, "")

def test_parse_errors(self):
with self.assertRaises(ValueError):
JobURI("foo:///tmp/bar").remote
Expand Down
17 changes: 17 additions & 0 deletions t/t2802-uri-cmd.t
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,23 @@ test_expect_success 'flux uri resolves hierarchical jobids with ?local' '
test_debug "echo ${jobid}/${jobid2}?local is ${uri}"
'
test_expect_success 'flux uri works with relative paths' '
root_uri=$(FLUX_SSH=$testssh flux uri --local .) &&
job1_uri=$(FLUX_SSH=$testssh flux uri --local ${jobid}) &&
job2_uri=$(FLUX_SSH=$testssh flux uri --local ${jobid}/${jobid2}) &&
uri=$(FLUX_SSH=$testssh flux proxy $job2_uri flux uri /) &&
test_debug "echo flux uri / got ${uri} expected ${root_uri}" &&
test "$uri" = "$root_uri" &&
uri=$(FLUX_SSH=$testssh flux proxy $job2_uri flux uri ../..) &&
test_debug "echo flux uri ../.. got ${uri} expected ${root_uri}" &&
test "$uri" = "$root_uri" &&
uri=$(FLUX_SSH=$testssh flux proxy $job2_uri flux uri ..) &&
test_debug "echo flux uri .. got ${uri} expected ${job1_uri}" &&
test "$uri" = "$job1_uri" &&
uri=$(FLUX_SSH=$testssh flux proxy $job2_uri flux uri .) &&
test_debug "echo flux uri . got ${uri} expected ${job2_uri}" &&
test "$uri" = "$job2_uri"
'
test_expect_success 'flux uri --wait can resolve URI for pending job' '
uri=$(flux uri --wait $(flux batch -n1 --wrap hostname)) &&
flux job wait-event -vt 30 $(flux job last) clean &&
Expand Down

0 comments on commit 921ed27

Please sign in to comment.