Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revise support and worker state callbacks #17

Open
wants to merge 9 commits into
base: master
Choose a base branch
from
Open

Conversation

JamesWrigley
Copy link
Collaborator

There's a few changes in here, I would recommend reviewing each commit individually. The big ones are:

  • Support for worker added/exiting/exited callbacks.
  • Revise support (requires callbacks to implement it properly).

Depends on timholy/Revise.jl#871.

@JamesWrigley JamesWrigley self-assigned this Dec 10, 2024
Copy link
Member

@jpsamaroo jpsamaroo left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great stuff! I have some questions about the semantics of these callbacks w.r.t error conditions, but other than that, this all seems solid.

@@ -1185,7 +1276,7 @@ function deregister_worker(pg, pid)
# Notify the cluster manager of this workers death
manage(w.manager, w.id, w.config, :deregister)
if PGRP.topology !== :all_to_all || isclusterlazy()
for rpid in workers()
for rpid in filter(!=(myid()), workers())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the line above we do pg.workers = filter(x -> !(x.id == pid), pg.workers), isn't this sufficient to remove our pid (which is supposedly also pid) from the results of workers()?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No that only removes pid (the ID of the dead worker) from the internal list. deregister_worker() is called on the live workers when a worker pid goes down, so when we recurse our own ID will still be in workers(). This didn't cause issues before because the function can handle being called recursively, but now it would cause the exited callbacks to be called twice.

src/cluster.jl Outdated
new_workers = @lock worker_lock addprocs_locked(manager::ClusterManager; kwargs...)
for worker in new_workers
for callback in values(worker_added_callbacks)
callback(worker)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If one of these callbacks throws, what should we do? Right now we'll just bail out of addprocs, but it might make sense to make it a non-fatal error (printed with @error)?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No I think we should definitely throw the exception so that it's obvious that worker initialization somehow failed, otherwise code that runs later assuming initialization succeeded may cause even more errors. But I did change how we run them in 90f44f6 so that they execute concurrently to not slow down addprocs() too much and so that we can have warnings about slow callbacks.

add_worker_added_callback(f::Base.Callable; key=nothing)

Register a callback to be called on the master process whenever a worker is
added. The callback will be called with the added worker ID,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should document the on-error behavior here.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, fixed in 90f44f6.

src/cluster.jl Outdated
"""
remove_worker_added_callback(key)

Remove the callback for `key`.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Remove the callback for `key`.
Remove the callback for `key` that was added via `add_worker_added_callback`.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in 90f44f6.

src/cluster.jl Outdated

Register a callback to be called on the master process whenever a worker is
added. The callback will be called with the added worker ID,
e.g. `f(w::Int)`. Returns a unique key for the callback.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
e.g. `f(w::Int)`. Returns a unique key for the callback.
e.g. `f(w::Int)`. Chooses and returns a unique key for the callback
if `key` is not specified.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in 90f44f6.

src/cluster.jl Outdated
Register a callback to be called on the master process when a worker has exited
for any reason (i.e. not only because of [`rmprocs()`](@ref) but also the worker
segfaulting etc). The callback will be called with the worker ID,
e.g. `f(w::Int)`. Returns a unique key for the callback.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
e.g. `f(w::Int)`. Returns a unique key for the callback.
e.g. `f(w::Int)`. Chooses and returns a unique key for the callback
if `key` is not specified.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in 90f44f6.

src/cluster.jl Outdated
"""
remove_worker_exiting_callback(key)

Remove the callback for `key`.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Remove the callback for `key`.
Remove the callback for `key` that was added via `add_worker_exiting_callback`.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in 90f44f6.

src/cluster.jl Outdated
"""
remove_worker_exited_callback(key)

Remove the callback for `key`.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Remove the callback for `key`.
Remove the callback for `key` that was added via `add_worker_exited_callback`.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in 90f44f6.

src/cluster.jl Outdated
end

if timedwait(() -> all(istaskdone.(callback_tasks)), callback_timeout) === :timed_out
@warn "Some callbacks timed out, continuing to remove workers anyway"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
@warn "Some callbacks timed out, continuing to remove workers anyway"
@warn "Some worker-exiting callbacks have not yet finished, continuing to remove workers anyway"

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in 90f44f6.

src/cluster.jl Outdated
# Call callbacks on the master
if myid() == 1
for callback in values(worker_exited_callbacks)
callback(pid)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

try/catch -> @error?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah that's better, added in 90f44f6.

@jpsamaroo jpsamaroo mentioned this pull request Dec 12, 2024
8 tasks
@jpsamaroo
Copy link
Member

One additional thought: should we have a way to detect a worker exit reason? If a worker exits due to a segfault or OOM, this might be important to know so that I can do some recovery actions or reporting to the user. It could also be something we add later, as its own set of callbacks, but I figure it's worth thinking about before merging this.

Previously we were not filtering out the current worker when calling
`deregister_worker()` on `workers()`.
@JamesWrigley
Copy link
Collaborator Author

One additional thought: should we have a way to detect a worker exit reason? If a worker exits due to a segfault or OOM, this might be important to know so that I can do some recovery actions or reporting to the user. It could also be something we add later, as its own set of callbacks, but I figure it's worth thinking about before merging this.

That is an excellent point 🤔 I think it should go with the current set of callbacks, e.g. what if the signature for the worker-exiting callbacks was f(w::Int, reason::ExitReasonEnum)?

@JBlaschke
Copy link

JBlaschke commented Dec 23, 2024

Btw, I think this is really cool stuff @JamesWrigley!

@jpsamaroo and I where discussing this in the context advanced scheduling (i.e. your compute might be growing or shrinking) -- or you're on shared nodes where the sysadmin might kill a worker which is leaking memory. I this context I've been building "nanny workers" that will re-add dead workers, etc. But this has been a pain in the ^%#@

I am happy with this PR as is, but fi there are spare cycles, I want propose additional improvements:

  1. Instead of the {added, exiting, exited} states, I propose using {starting, started, exiting, exited}. I.e follow the semantics of event => conclusion. So request was made to add a worker (starting) => the worker is ready for work (started); and request was made to shut down a worker (exiting) => worker is gone (exited). This brings the API in line with resource managers like Slurm (which have distinct "I've started doing something" and "I'm done doing that thing" phases).
  2. Is there a way to more effectively capture the state of an exiting worker? I.e. capturing the exit code is a step in the right direction, but perhaps we also want to capture a little more context that will help an application automatically recover from an unexpected failure (e.g. "working on dataset 23", or /path/to/coredump). I.e. have the user be able to define additional optional input args to some of the callbacks?

@JamesWrigley
Copy link
Collaborator Author

Thanks for taking a look :)

About those things:

  1. I don't have any objection to adding a started callback, but what would it be used for? I can't think of a situation where a library that isn't calling addprocs() itself (in which case it would already know which workers are being started) would need to know about a worker it can't connect to yet.

  2. In the case of the worker being killed gracefully I think this could be implemented already by an exiting callback to extract some status information from the worker. But if it died from e.g. a segfault/OOM it gets a bit more complicated 🤔 Checking for a coredump would involve adding another worker on the same node (which might not even be possible if its slurm allocation is expired), and that's a bit more side-effects than I'm comfortable with. Maybe we could pass the last ~100 lines of the logs to the callback?

    Alternatively, we could support some kind of per-worker status that a worker can set for itself (e.g. setmystatus("working on dataset 23")), and pass the last status to the exited callback?

@JBlaschke
Copy link

  • I don't have any objection to adding a started callback, but what would it be used for? I can't think of a situation where a library that isn't calling addprocs() itself (in which case it would already know which workers are being started) would need to know about a worker it can't connect to yet.

In general I find it harder to change an API after the fact ;) -- Anyway the situation you're describing is where one worker is responsible for managing the overall workflow, as is common. However this is not always the case at scale (eg. what if we had >10k workers?). In those situations you often lay out your workers on a tree with several "management" nodes (eg. think fat trees, but for workers and not networks). In this situation you want to build callbacks that notify those manager nodes that the leaves have just changed (or are about to change).

  • In the case of the worker being killed gracefully I think this could be implemented already by an exiting callback to extract some status information from the worker. But if it died from e.g. a segfault/OOM it gets a bit more complicated 🤔 Checking for a coredump would involve adding another worker on the same node (which might not even be possible if its slurm allocation is expired), and that's a bit more side-effects than I'm comfortable with. Maybe we could pass the last ~100 lines of the logs to the callback?

Slurm can be configured to send a signal N seconds before killing a process. We also have the ability to add hooks to Slurm jobs upon process startup/completion. Oh, and coredumps don't have to land in /tmp -- I recommend $SCRATCH for these things at NERSC. Things like passing back part of the Slurm log is definitely in the right direction. I was thinking of something that the users and sysadmins can configure for a given library / application though.

Alternatively, we could support some kind of per-worker status that a worker can set for itself (e.g. setmystatus("working on dataset 23")), and pass the last status to the exited callback?

That's neat! I love it -- it can also help with overall workflow tooling.

All-in-all I am happy with this PR as is -- my comments are meant to make somethings that's great even better.

@JamesWrigley
Copy link
Collaborator Author

Alrighty, switched to {starting, started, exiting, exited} callbacks in 8b04241 / 21da9fc.

Slurm can be configured to send a signal N seconds before killing a process. We also have the ability to add hooks to Slurm jobs upon process startup/completion. Oh, and coredumps don't have to land in /tmp -- I recommend $SCRATCH for these things at NERSC. Things like passing back part of the Slurm log is definitely in the right direction. I was thinking of something that the users and sysadmins can configure for a given library / application though.

On second thoughts I'm unsure about promising logs because I don't know how the RemoteLogger's thing will pan out: JuliaLang/Distributed.jl#94. We can always promise to send the stdout of the worker because we're guaranteed to get that, but it might not have any useful information.

Am I correct in thinking that worker statuses and the worker-exited callbacks are sufficient for the uses you're talking about? The way I think about it is that there's three possible scenarios for exits:

  1. Worker gets a request to exit (e.g. by slurm signal): it sets its status and then remotecalls the master to rmproc() itself. The worker-exited callback is called like f(pid, last_status, ExitKind_graceful).
  2. Worker is removed by the master: it may get a chance to set a final status if there's a worker-exiting callback that will tell it do so, either way the worker-exited callback gets called like f(pid, last_status, ExitKind_graceful).
  3. Worker dies suddenly (segfault/OOM etc): the worker-exited callback gets called like f(pid, last_status, ExitKind_forceful) and it can look for coredumps etc.

The new `WorkerState_exterminated` state is for indicating that a worker was
killed by something other than us.
@JamesWrigley
Copy link
Collaborator Author

Some updates:

  • Implement an interface for Distributed-like libraries timholy/Revise.jl#871 is merged and released now, but with a new minimum Julia version of 1.10 so I bumped the minimum Julia version for DistributedNext too in 914e50c.
  • I took the liberty of rebasing since the commit history was getting a bit long.
  • The worker-exited callbacks now get the worker state as well as their ID, which will tell them if the worker exited gracefully or not. As part of this I renamed the WorkerState instances and added a new state for forceful exits (WorkerState_exterminated) in d5fd837. The next thing to do is add support for worker statuses and pass those to the worker-exited callbacks, and then I think this will be ready to merge.

@codecov-commenter
Copy link

codecov-commenter commented Jan 2, 2025

Codecov Report

Attention: Patch coverage is 95.37037% with 5 lines in your changes missing coverage. Please review.

Project coverage is 88.26%. Comparing base (0cca4d3) to head (64aba00).

Files with missing lines Patch % Lines
src/cluster.jl 94.68% 5 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##           master      #17      +/-   ##
==========================================
+ Coverage   88.24%   88.26%   +0.01%     
==========================================
  Files          11       12       +1     
  Lines        2068     2138      +70     
==========================================
+ Hits         1825     1887      +62     
- Misses        243      251       +8     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

This should fix an exception seen in CI from the lingering timeout task:
```
 Test Summary:                                | Pass  Total  Time
Deserialization error recovery and include() |   11     11  3.9s
      From worker 4:	Unhandled Task ERROR: EOFError: read end of file
      From worker 4:	Stacktrace:
      From worker 4:	 [1] wait
      From worker 4:	   @ .\asyncevent.jl:159 [inlined]
      From worker 4:	 [2] sleep(sec::Float64)
      From worker 4:	   @ Base .\asyncevent.jl:265
      From worker 4:	 [3] (::DistributedNext.var"#34#37"{DistributedNext.Worker, Float64})()
      From worker 4:	   @ DistributedNext D:\a\DistributedNext.jl\DistributedNext.jl\src\cluster.jl:213
```
@JamesWrigley
Copy link
Collaborator Author

Alrighty, implemented worker statuses in 64aba00. Now they'll be passed to the worker-exited callbacks.

Apologies for how big this PR is becoming 😅 I've tried to keep the commits atomic so you can review them one-by-one.

@JamesWrigley
Copy link
Collaborator Author

Hmm, interestingly 0d5aaa3 seems to have almost entirely fixed #6. There are no more timeouts on Linux/OSX and I see only one on Windows.

The common problem with these hangs seems to be lingering tasks blocking Julia from exiting. At some point we should probably audit Distributed[Next] to remove all of them.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants