From 87793726027100ce7af9a03dd136c11eb4e89d9a Mon Sep 17 00:00:00 2001 From: JamesWrigley Date: Tue, 10 Dec 2024 23:07:28 +0100 Subject: [PATCH 1/9] Don't recursively call deregister_worker() on the current worker Previously we were not filtering out the current worker when calling `deregister_worker()` on `workers()`. --- src/cluster.jl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/cluster.jl b/src/cluster.jl index 10c53ac..7480cfe 100644 --- a/src/cluster.jl +++ b/src/cluster.jl @@ -1208,7 +1208,7 @@ function deregister_worker(pg, pid) # Notify the cluster manager of this workers death manage(w.manager, w.id, w.config, :deregister) if PGRP.topology !== :all_to_all || isclusterlazy() - for rpid in workers() + for rpid in filter(!=(myid()), workers()) try remote_do(deregister_worker, rpid, pid) catch From d5fd8373fd0aa113af42a65461dcb4e738b0ab1f Mon Sep 17 00:00:00 2001 From: JamesWrigley Date: Thu, 2 Jan 2025 21:55:34 +0100 Subject: [PATCH 2/9] Rename the WorkerState instances and add an exterminated state The new `WorkerState_exterminated` state is for indicating that a worker was killed by something other than us. --- src/cluster.jl | 45 +++++++++++++++++++++++++++-------------- src/messages.jl | 2 +- src/process_messages.jl | 6 +++--- 3 files changed, 34 insertions(+), 19 deletions(-) diff --git a/src/cluster.jl b/src/cluster.jl index 7480cfe..ba978fb 100644 --- a/src/cluster.jl +++ b/src/cluster.jl @@ -92,7 +92,15 @@ mutable struct WorkerConfig end end -@enum WorkerState W_CREATED W_CONNECTED W_TERMINATING W_TERMINATED W_UNKNOWN_STATE +@enum WorkerState begin + WorkerState_created + WorkerState_connected + WorkerState_terminating # rmprocs() has been called on the worker + WorkerState_terminated # Worker was gracefully removed + WorkerState_exterminated # Worker was forcefully removed (not by us) + WorkerState_unknown +end + mutable struct Worker id::Int msg_lock::Threads.ReentrantLock # Lock for del_msgs, add_msgs, and gcflag @@ -123,7 +131,7 @@ mutable struct Worker w.manager = manager w.config = config w.version = version - set_worker_state(w, W_CONNECTED) + set_worker_state(w, WorkerState_connected) register_worker_streams(w) w end @@ -134,7 +142,7 @@ mutable struct Worker if haskey(map_pid_wrkr, id) return map_pid_wrkr[id] end - w=new(id, Threads.ReentrantLock(), [], [], false, W_CREATED, Threads.Condition(), time(), conn_func) + w=new(id, Threads.ReentrantLock(), [], [], false, WorkerState_created, Threads.Condition(), time(), conn_func) w.initialized = Event() register_worker(w) w @@ -150,8 +158,15 @@ function set_worker_state(w, state) end end +# Helper function to check if a worker is dead or not. It's recommended to use +# this instead of checking Worker.state manually. +function is_worker_dead(w::Worker) + state = @atomic w.state + return state === WorkerState_terminated || state === WorkerState_exterminated +end + function check_worker_state(w::Worker) - if (@atomic w.state) === W_CREATED + if (@atomic w.state) === WorkerState_created if !isclusterlazy() if PGRP.topology === :all_to_all # Since higher pids connect with lower pids, the remote worker @@ -190,7 +205,7 @@ function exec_conn_func(w::Worker) end function wait_for_conn(w) - if (@atomic w.state) === W_CREATED + if (@atomic w.state) === WorkerState_created timeout = worker_timeout() - (time() - w.ct_time) timeout <= 0 && error("peer $(w.id) has not connected to $(myid())") @@ -203,7 +218,7 @@ function wait_for_conn(w) errormonitor(T) lock(w.c_state) do wait(w.c_state) - (@atomic w.state) === W_CREATED && error("peer $(w.id) didn't connect to $(myid()) within $timeout seconds") + (@atomic w.state) === WorkerState_created && error("peer $(w.id) didn't connect to $(myid()) within $timeout seconds") end end nothing @@ -666,7 +681,7 @@ function create_worker(manager, wconfig) if (jw.id != 1) && (jw.id < w.id) lock(jw.c_state) do # wait for wl to join - if (@atomic jw.state) === W_CREATED + if (@atomic jw.state) === WorkerState_created wait(jw.c_state) end end @@ -693,7 +708,7 @@ function create_worker(manager, wconfig) for wl in wlist lock(wl.c_state) do - if (@atomic wl.state) === W_CREATED + if (@atomic wl.state) === WorkerState_created # wait for wl to join wait(wl.c_state) end @@ -900,7 +915,7 @@ function nprocs() n = length(PGRP.workers) # filter out workers in the process of being setup/shutdown. for jw in PGRP.workers - if !isa(jw, LocalProcess) && ((@atomic jw.state) !== W_CONNECTED) + if !isa(jw, LocalProcess) && ((@atomic jw.state) !== WorkerState_connected) n = n - 1 end end @@ -953,7 +968,7 @@ julia> procs() function procs() if myid() == 1 || (PGRP.topology === :all_to_all && !isclusterlazy()) # filter out workers in the process of being setup/shutdown. - return Int[x.id for x in PGRP.workers if isa(x, LocalProcess) || ((@atomic x.state) === W_CONNECTED)] + return Int[x.id for x in PGRP.workers if isa(x, LocalProcess) || ((@atomic x.state) === WorkerState_connected)] else return Int[x.id for x in PGRP.workers] end @@ -970,7 +985,7 @@ other_procs() = filter(!=(myid()), procs()) function id_in_procs(id) # faster version of `id in procs()` if myid() == 1 || (PGRP.topology === :all_to_all && !isclusterlazy()) for x in PGRP.workers - if (x.id::Int) == id && (isa(x, LocalProcess) || (@atomic (x::Worker).state) === W_CONNECTED) + if (x.id::Int) == id && (isa(x, LocalProcess) || (@atomic (x::Worker).state) === WorkerState_connected) return true end end @@ -994,7 +1009,7 @@ See also [`other_procs()`](@ref). """ function procs(pid::Integer) if myid() == 1 - all_workers = [x for x in PGRP.workers if isa(x, LocalProcess) || ((@atomic x.state) === W_CONNECTED)] + all_workers = [x for x in PGRP.workers if isa(x, LocalProcess) || ((@atomic x.state) === WorkerState_connected)] if (pid == 1) || (isa(map_pid_wrkr[pid].manager, LocalManager)) Int[x.id for x in filter(w -> (w.id==1) || (isa(w.manager, LocalManager)), all_workers)] else @@ -1103,7 +1118,7 @@ function _rmprocs(pids, waitfor) else if haskey(map_pid_wrkr, p) w = map_pid_wrkr[p] - set_worker_state(w, W_TERMINATING) + set_worker_state(w, WorkerState_terminating) kill(w.manager, p, w.config) push!(rmprocset, w) end @@ -1112,11 +1127,11 @@ function _rmprocs(pids, waitfor) start = time_ns() while (time_ns() - start) < waitfor*1e9 - all(w -> (@atomic w.state) === W_TERMINATED, rmprocset) && break + all(is_worker_dead, rmprocset) && break sleep(min(0.1, waitfor - (time_ns() - start)/1e9)) end - unremoved = [wrkr.id for wrkr in filter(w -> (@atomic w.state) !== W_TERMINATED, rmprocset)] + unremoved = [wrkr.id for wrkr in filter(!is_worker_dead, rmprocset)] if length(unremoved) > 0 estr = string("rmprocs: pids ", unremoved, " not terminated after ", waitfor, " seconds.") throw(ErrorException(estr)) diff --git a/src/messages.jl b/src/messages.jl index 6e895f0..aaa8642 100644 --- a/src/messages.jl +++ b/src/messages.jl @@ -194,7 +194,7 @@ end function flush_gc_msgs() try for w in (PGRP::ProcessGroup).workers - if isa(w,Worker) && ((@atomic w.state) == W_CONNECTED) && w.gcflag + if isa(w,Worker) && ((@atomic w.state) == WorkerState_connected) && w.gcflag flush_gc_msgs(w) end end diff --git a/src/process_messages.jl b/src/process_messages.jl index a444651..13c4a62 100644 --- a/src/process_messages.jl +++ b/src/process_messages.jl @@ -210,7 +210,7 @@ function message_handler_loop(r_stream::IO, w_stream::IO, incoming::Bool) handle_msg(msg, header, r_stream, w_stream, version) end catch e - oldstate = W_UNKNOWN_STATE + oldstate = WorkerState_unknown # Check again as it may have been set in a message handler but not propagated to the calling block above if wpid < 1 @@ -223,7 +223,7 @@ function message_handler_loop(r_stream::IO, w_stream::IO, incoming::Bool) elseif !(wpid in map_del_wrkr) werr = worker_from_id(wpid) oldstate = @atomic werr.state - set_worker_state(werr, W_TERMINATED) + set_worker_state(werr, oldstate != WorkerState_terminating ? WorkerState_exterminated : WorkerState_terminated) # If unhandleable error occurred talking to pid 1, exit if wpid == 1 @@ -243,7 +243,7 @@ function message_handler_loop(r_stream::IO, w_stream::IO, incoming::Bool) close(w_stream) if (myid() == 1) && (wpid > 1) - if oldstate != W_TERMINATING + if oldstate != WorkerState_terminating println(stderr, "Worker $wpid terminated.") rethrow() end From 764ceecc3d3289933ec3399329faa01600799328 Mon Sep 17 00:00:00 2001 From: JamesWrigley Date: Tue, 10 Dec 2024 23:13:00 +0100 Subject: [PATCH 3/9] Add support for worker state callbacks --- docs/src/_changelog.md | 1 + docs/src/index.md | 13 +++ src/cluster.jl | 214 ++++++++++++++++++++++++++++++++++++--- test/distributed_exec.jl | 70 +++++++++++++ 4 files changed, 284 insertions(+), 14 deletions(-) diff --git a/docs/src/_changelog.md b/docs/src/_changelog.md index 5d9207f..bce0c98 100644 --- a/docs/src/_changelog.md +++ b/docs/src/_changelog.md @@ -18,6 +18,7 @@ This documents notable changes in DistributedNext.jl. The format is based on incompatibilities from both libraries being used simultaneously ([#10]). - [`other_workers()`](@ref) and [`other_procs()`](@ref) were implemented and exported ([#18]). +- Implemented callback support for workers being added/removed etc ([#17]). ## [v1.0.0] - 2024-12-02 diff --git a/docs/src/index.md b/docs/src/index.md index 41fba38..e0cf6cf 100644 --- a/docs/src/index.md +++ b/docs/src/index.md @@ -52,6 +52,19 @@ DistributedNext.cluster_cookie() DistributedNext.cluster_cookie(::Any) ``` +## Callbacks + +```@docs +DistributedNext.add_worker_starting_callback +DistributedNext.remove_worker_starting_callback +DistributedNext.add_worker_started_callback +DistributedNext.remove_worker_started_callback +DistributedNext.add_worker_exiting_callback +DistributedNext.remove_worker_exiting_callback +DistributedNext.add_worker_exited_callback +DistributedNext.remove_worker_exited_callback +``` + ## Cluster Manager Interface This interface provides a mechanism to launch and manage Julia workers on different cluster environments. diff --git a/src/cluster.jl b/src/cluster.jl index ba978fb..c0d616e 100644 --- a/src/cluster.jl +++ b/src/cluster.jl @@ -472,20 +472,28 @@ end ``` """ function addprocs(manager::ClusterManager; kwargs...) + params = merge(default_addprocs_params(manager), Dict{Symbol, Any}(kwargs)) + init_multi() cluster_mgmt_from_master_check() - lock(worker_lock) - try - addprocs_locked(manager::ClusterManager; kwargs...) - finally - unlock(worker_lock) - end + # Call worker-starting callbacks + warning_interval = params[:callback_warning_interval] + _run_callbacks_concurrently("worker-starting", worker_starting_callbacks, + warning_interval, [(manager, params)]) + + # Add new workers + new_workers = @lock worker_lock addprocs_locked(manager::ClusterManager, params) + + # Call worker-started callbacks + _run_callbacks_concurrently("worker-started", worker_started_callbacks, + warning_interval, new_workers) + + return new_workers end -function addprocs_locked(manager::ClusterManager; kwargs...) - params = merge(default_addprocs_params(manager), Dict{Symbol,Any}(kwargs)) +function addprocs_locked(manager::ClusterManager, params) topology(Symbol(params[:topology])) if PGRP.topology !== :all_to_all @@ -572,7 +580,8 @@ default_addprocs_params() = Dict{Symbol,Any}( :exeflags => ``, :env => [], :enable_threaded_blas => false, - :lazy => true) + :lazy => true, + :callback_warning_interval => 10) function setup_launched_worker(manager, wconfig, launched_q) @@ -870,6 +879,10 @@ const HDR_COOKIE_LEN=16 const map_pid_wrkr = Dict{Int, Union{Worker, LocalProcess}}() const map_sock_wrkr = IdDict() const map_del_wrkr = Set{Int}() +const worker_starting_callbacks = Dict{Any, Base.Callable}() +const worker_started_callbacks = Dict{Any, Base.Callable}() +const worker_exiting_callbacks = Dict{Any, Base.Callable}() +const worker_exited_callbacks = Dict{Any, Base.Callable}() # whether process is a master or worker in a distributed setup myrole() = LPROCROLE[] @@ -877,6 +890,149 @@ function myrole!(proctype::Symbol) LPROCROLE[] = proctype end +# Callbacks + +function _run_callbacks_concurrently(callbacks_name, callbacks_dict, warning_interval, arglist) + callback_tasks = Dict{Any, Task}() + for args in arglist + for (name, callback) in callbacks_dict + callback_tasks[name] = Threads.@spawn callback(args...) + end + end + + running_callbacks = () -> ["'$(key)'" for (key, task) in callback_tasks if !istaskdone(task)] + while timedwait(() -> isempty(running_callbacks()), warning_interval) === :timed_out + callbacks_str = join(running_callbacks(), ", ") + @warn "Waiting for these $(callbacks_name) callbacks to finish: $(callbacks_str)" + end + + # Wait on the tasks so that exceptions bubble up + wait.(values(callback_tasks)) +end + +function _add_callback(f, key, dict; arg_types=Tuple{Int}) + desired_signature = "f(" * join(["::$(t)" for t in arg_types.types], ", ") * ")" + + if !hasmethod(f, arg_types) + throw(ArgumentError("Callback function is invalid, it must be able to be called with these argument types: $(desired_signature)")) + elseif haskey(dict, key) + throw(ArgumentError("A callback function with key '$(key)' already exists")) + end + + if isnothing(key) + key = Symbol(gensym(), nameof(f)) + end + + dict[key] = f + return key +end + +_remove_callback(key, dict) = delete!(dict, key) + +""" + add_worker_starting_callback(f::Base.Callable; key=nothing) + +Register a callback to be called on the master process immediately before new +workers are started. The callback `f` will be called with the `ClusterManager` +instance that is being used and a dictionary of parameters related to adding +workers, i.e. `f(manager, params)`. The `params` dictionary is specific to the +`manager` type. Note that the `LocalManager` and `SSHManager` cluster managers +in DistributedNext are not fully documented yet, see the +[managers.jl](https://github.com/JuliaParallel/DistributedNext.jl/blob/master/src/managers.jl) +file for their definitions. + +!!! warning + Adding workers can fail so it is not guaranteed that the workers requested + will exist. + +The worker-starting callbacks will be executed concurrently. If one throws an +exception it will not be caught and will bubble up through [`addprocs`](@ref). + +Keep in mind that the callbacks will add to the time taken to launch workers; so +try to either keep the callbacks fast to execute, or do the actual work +asynchronously by spawning a task in the callback (beware of race conditions if +you do this). +""" +add_worker_starting_callback(f::Base.Callable; key=nothing) = _add_callback(f, key, worker_starting_callbacks; + arg_types=Tuple{ClusterManager, Dict}) +""" + remove_worker_starting_callback(key) + +Remove the callback for `key` that was added with [`add_worker_starting_callback()`](@ref). +""" +remove_worker_starting_callback(key) = _remove_callback(key, worker_starting_callbacks) + +""" + add_worker_started_callback(f::Base.Callable; key=nothing) + +Register a callback to be called on the master process whenever a worker is +added. The callback will be called with the added worker ID, +e.g. `f(w::Int)`. Chooses and returns a unique key for the callback if `key` is +not specified. + +The worker-started callbacks will be executed concurrently. If one throws an +exception it will not be caught and will bubble up through [`addprocs()`](@ref). + +Keep in mind that the callbacks will add to the time taken to launch workers; so +try to either keep the callbacks fast to execute, or do the actual +initialization asynchronously by spawning a task in the callback (beware of race +conditions if you do this). +""" +add_worker_started_callback(f::Base.Callable; key=nothing) = _add_callback(f, key, worker_started_callbacks) + +""" + remove_worker_started_callback(key) + +Remove the callback for `key` that was added with [`add_worker_started_callback()`](@ref). +""" +remove_worker_started_callback(key) = _remove_callback(key, worker_started_callbacks) + +""" + add_worker_exiting_callback(f::Base.Callable; key=nothing) + +Register a callback to be called on the master process immediately before a +worker is removed with [`rmprocs()`](@ref). The callback will be called with the +worker ID, e.g. `f(w::Int)`. Chooses and returns a unique key for the callback +if `key` is not specified. + +All worker-exiting callbacks will be executed concurrently and if they don't +all finish before the `callback_timeout` passed to `rmprocs()` then the process +will be removed anyway. +""" +add_worker_exiting_callback(f::Base.Callable; key=nothing) = _add_callback(f, key, worker_exiting_callbacks) + +""" + remove_worker_exiting_callback(key) + +Remove the callback for `key` that was added with [`add_worker_exiting_callback()`](@ref). +""" +remove_worker_exiting_callback(key) = _remove_callback(key, worker_exiting_callbacks) + +""" + add_worker_exited_callback(f::Base.Callable; key=nothing) + +Register a callback to be called on the master process when a worker has exited +for any reason (i.e. not only because of [`rmprocs()`](@ref) but also the worker +segfaulting etc). Chooses and returns a unique key for the callback if `key` is +not specified. + +The callback will be called with the worker ID and the final +`Distributed.WorkerState` of the worker, e.g. `f(w::Int, state)`. `state` is an +enum, a value of `WorkerState_terminated` means a graceful exit and a value of +`WorkerState_exterminated` means the worker died unexpectedly. + +If the callback throws an exception it will be caught and printed. +""" +add_worker_exited_callback(f::Base.Callable; key=nothing) = _add_callback(f, key, worker_exited_callbacks; + arg_types=Tuple{Int, WorkerState}) + +""" + remove_worker_exited_callback(key) + +Remove the callback for `key` that was added with [`add_worker_exited_callback()`](@ref). +""" +remove_worker_exited_callback(key) = _remove_callback(key, worker_exited_callbacks) + # cluster management related API """ myid() @@ -1063,7 +1219,7 @@ function cluster_mgmt_from_master_check() end """ - rmprocs(pids...; waitfor=typemax(Int)) + rmprocs(pids...; waitfor=typemax(Int), callback_timeout=10) Remove the specified workers. Note that only process 1 can add or remove workers. @@ -1077,6 +1233,10 @@ Argument `waitfor` specifies how long to wait for the workers to shut down: returned. The user should call [`wait`](@ref) on the task before invoking any other parallel calls. +The `callback_timeout` specifies how long to wait for any callbacks to execute +before continuing to remove the workers (see +[`add_worker_exiting_callback()`](@ref)). + # Examples ```julia-repl \$ julia -p 5 @@ -1093,24 +1253,38 @@ julia> workers() 6 ``` """ -function rmprocs(pids...; waitfor=typemax(Int)) +function rmprocs(pids...; waitfor=typemax(Int), callback_timeout=10) cluster_mgmt_from_master_check() pids = vcat(pids...) if waitfor == 0 - t = @async _rmprocs(pids, typemax(Int)) + t = @async _rmprocs(pids, typemax(Int), callback_timeout) yield() return t else - _rmprocs(pids, waitfor) + _rmprocs(pids, waitfor, callback_timeout) # return a dummy task object that user code can wait on. return @async nothing end end -function _rmprocs(pids, waitfor) +function _rmprocs(pids, waitfor, callback_timeout) lock(worker_lock) try + # Run the callbacks + callback_tasks = Dict{Any, Task}() + for pid in pids + for (name, callback) in worker_exiting_callbacks + callback_tasks[name] = Threads.@spawn callback(pid) + end + end + + if timedwait(() -> all(istaskdone.(values(callback_tasks))), callback_timeout) === :timed_out + timedout_callbacks = ["'$(key)'" for (key, task) in callback_tasks if !istaskdone(task)] + callbacks_str = join(timedout_callbacks, ", ") + @warn "Some worker-exiting callbacks have not yet finished, continuing to remove workers anyway. These are the callbacks still running: $(callbacks_str)" + end + rmprocset = Union{LocalProcess, Worker}[] for p in pids if p == 1 @@ -1256,6 +1430,18 @@ function deregister_worker(pg, pid) delete!(pg.refs, id) end end + + # Call callbacks on the master + if myid() == 1 + for (name, callback) in worker_exited_callbacks + try + callback(pid, w.state) + catch ex + @error "Error when running worker-exited callback '$(name)'" exception=(ex, catch_backtrace()) + end + end + end + return end diff --git a/test/distributed_exec.jl b/test/distributed_exec.jl index 8bfc462..c01c381 100644 --- a/test/distributed_exec.jl +++ b/test/distributed_exec.jl @@ -1,6 +1,7 @@ # This file is a part of Julia. License is MIT: https://julialang.org/license using DistributedNext, Random, Serialization, Sockets +import DistributedNext import DistributedNext: launch, manage @@ -1934,6 +1935,75 @@ include("splitrange.jl") end end +@testset "Worker state callbacks" begin + rmprocs(other_workers()) + + # Adding a callback with an invalid signature should fail + @test_throws ArgumentError DistributedNext.add_worker_started_callback(() -> nothing) + + # Smoke test to ensure that all the callbacks are executed + starting_managers = [] + started_workers = Int[] + exiting_workers = Int[] + exited_workers = [] + starting_key = DistributedNext.add_worker_starting_callback((manager, kwargs) -> push!(starting_managers, manager)) + started_key = DistributedNext.add_worker_started_callback(pid -> (push!(started_workers, pid); error("foo"))) + exiting_key = DistributedNext.add_worker_exiting_callback(pid -> push!(exiting_workers, pid)) + exited_key = DistributedNext.add_worker_exited_callback((pid, state) -> push!(exited_workers, (pid, state))) + + # Test that the worker-started exception bubbles up + @test_throws TaskFailedException addprocs(1) + + pid = only(workers()) + @test only(starting_managers) isa DistributedNext.LocalManager + @test started_workers == [pid] + rmprocs(workers()) + @test exiting_workers == [pid] + @test exited_workers == [(pid, DistributedNext.WorkerState_terminated)] + + # Trying to reset an existing callback should fail + @test_throws ArgumentError DistributedNext.add_worker_started_callback(Returns(nothing); key=started_key) + + # Remove the callbacks + DistributedNext.remove_worker_starting_callback(starting_key) + DistributedNext.remove_worker_started_callback(started_key) + DistributedNext.remove_worker_exiting_callback(exiting_key) + DistributedNext.remove_worker_exited_callback(exited_key) + + # Test that the worker-exiting `callback_timeout` option works and that we + # get warnings about slow worker-started callbacks. + event = Base.Event() + callback_task = nothing + started_key = DistributedNext.add_worker_started_callback(_ -> sleep(0.5)) + exiting_key = DistributedNext.add_worker_exiting_callback(_ -> (callback_task = current_task(); wait(event))) + + @test_logs (:warn, r"Waiting for these worker-started callbacks.+") match_mode=:any addprocs(1; callback_warning_interval=0.05) + DistributedNext.remove_worker_started_callback(started_key) + + @test_logs (:warn, r"Some worker-exiting callbacks have not yet finished.+") rmprocs(workers(); callback_timeout=0.5) + DistributedNext.remove_worker_exiting_callback(exiting_key) + + notify(event) + wait(callback_task) + + # Test that the initial callbacks were indeed removed + @test length(starting_managers) == 1 + @test length(started_workers) == 1 + @test length(exiting_workers) == 1 + @test length(exited_workers) == 1 + + # Test that workers that were killed forcefully are detected as such + exit_state = nothing + DistributedNext.add_worker_exited_callback((pid, state) -> exit_state = state) + pid = only(addprocs(1)) + + redirect_stderr(devnull) do + remote_do(exit, pid) + timedwait(() -> !isnothing(exit_state), 10) + end + @test exit_state == DistributedNext.WorkerState_exterminated +end + # Run topology tests last after removing all workers, since a given # cluster at any time only supports a single topology. if nprocs() > 1 From 914e50c1b2ff10caa877c941bc9e5da2f6386557 Mon Sep 17 00:00:00 2001 From: JamesWrigley Date: Wed, 11 Dec 2024 00:08:19 +0100 Subject: [PATCH 4/9] Add an extension to support Revise --- .github/workflows/ci.yml | 4 +-- Project.toml | 11 +++++-- docs/src/_changelog.md | 1 + ext/ReviseExt.jl | 30 +++++++++++++++++++ test/distributed_exec.jl | 64 ++++++++++++++++++++++++++++++++++++++++ 5 files changed, 105 insertions(+), 5 deletions(-) create mode 100644 ext/ReviseExt.jl diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index d8f14f5..0b8d4cb 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -25,7 +25,7 @@ jobs: version: - 'nightly' - '1' - - '1.9' + - '1.10' os: - ubuntu-latest - macOS-latest @@ -36,8 +36,6 @@ jobs: exclude: - os: macOS-latest arch: x86 - - os: windows-latest # Killing workers doesn't work on windows in 1.9 - version: '1.9' steps: - uses: actions/checkout@v4 diff --git a/Project.toml b/Project.toml index 26447ea..7093abf 100644 --- a/Project.toml +++ b/Project.toml @@ -7,12 +7,19 @@ Random = "9a3f8284-a2c9-5f02-9a11-845980a1fd5c" Serialization = "9e88b42a-f829-5b0c-bbe9-9e923198166b" Sockets = "6462fe0b-24de-5631-8697-dd941f90decc" +[weakdeps] +Revise = "295af30f-e4ad-537b-8983-00126c2a3abe" + +[extensions] +ReviseExt = "Revise" + [compat] Distributed = "1" Random = "1" +Revise = "3.7.0" Serialization = "1" Sockets = "1" -julia = "1.9" +julia = "1.10" [extras] Distributed = "8ba89e20-285c-5b6f-9357-94700520ee1b" @@ -21,4 +28,4 @@ LinearAlgebra = "37e2e46d-f89d-539d-b4ee-838fcccc9c8e" Test = "8dfed614-e22c-5e08-85e1-65c5234f0b40" [targets] -test = ["LinearAlgebra", "Test", "LibSSH", "Distributed"] +test = ["LinearAlgebra", "Test", "LibSSH", "Distributed", "Revise"] diff --git a/docs/src/_changelog.md b/docs/src/_changelog.md index bce0c98..b297e3e 100644 --- a/docs/src/_changelog.md +++ b/docs/src/_changelog.md @@ -19,6 +19,7 @@ This documents notable changes in DistributedNext.jl. The format is based on - [`other_workers()`](@ref) and [`other_procs()`](@ref) were implemented and exported ([#18]). - Implemented callback support for workers being added/removed etc ([#17]). +- Added a package extension to support Revise.jl ([#17]). ## [v1.0.0] - 2024-12-02 diff --git a/ext/ReviseExt.jl b/ext/ReviseExt.jl new file mode 100644 index 0000000..269add5 --- /dev/null +++ b/ext/ReviseExt.jl @@ -0,0 +1,30 @@ +module ReviseExt + +import DistributedNext +import DistributedNext: myid, workers, remotecall + +import Revise + + +struct DistributedNextWorker <: Revise.AbstractWorker + id::Int +end + +function get_workers() + map(DistributedNextWorker, workers()) +end + +function Revise.remotecall_impl(f, worker::DistributedNextWorker, args...; kwargs...) + remotecall(f, worker.id, args...; kwargs...) +end + +Revise.is_master_worker(::typeof(get_workers)) = myid() == 1 +Revise.is_master_worker(worker::DistributedNextWorker) = worker.id == 1 + +function __init__() + Revise.register_workers_function(get_workers) + DistributedNext.add_worker_started_callback(pid -> Revise.init_worker(DistributedNextWorker(pid)); + key="DistributedNext-integration") +end + +end diff --git a/test/distributed_exec.jl b/test/distributed_exec.jl index c01c381..3e0dfe0 100644 --- a/test/distributed_exec.jl +++ b/test/distributed_exec.jl @@ -1,5 +1,6 @@ # This file is a part of Julia. License is MIT: https://julialang.org/license +import Revise using DistributedNext, Random, Serialization, Sockets import DistributedNext import DistributedNext: launch, manage @@ -2004,6 +2005,69 @@ end @test exit_state == DistributedNext.WorkerState_exterminated end +# This is a simplified copy of a test from Revise.jl's tests +@testset "Revise.jl integration" begin + function rm_precompile(pkgname::AbstractString) + filepath = Base.cache_file_entry(Base.PkgId(pkgname)) + isa(filepath, Tuple) && (filepath = filepath[1]*filepath[2]) # Julia 1.3+ + for depot in DEPOT_PATH + fullpath = joinpath(depot, filepath) + isfile(fullpath) && rm(fullpath) + end + end + + pid = only(addprocs(1)) + + # Test that initialization succeeds by checking that Main.whichtt is defined + # on the worker, which is defined by Revise.init_worker(). + @test timedwait(() ->remotecall_fetch(() -> hasproperty(Main, :whichtt), pid), 10) == :ok + + tmpdir = mktempdir() + @everywhere push!(LOAD_PATH, $tmpdir) # Don't want to share this LOAD_PATH + + # Create a fake package + module_file = joinpath(tmpdir, "ReviseDistributed", "src", "ReviseDistributed.jl") + mkpath(dirname(module_file)) + write(module_file, + """ + module ReviseDistributed + + f() = π + g(::Int) = 0 + + end + """) + + # Check that we can use it + @everywhere using ReviseDistributed + for p in procs() + @test remotecall_fetch(ReviseDistributed.f, p) == π + @test remotecall_fetch(ReviseDistributed.g, p, 1) == 0 + end + + # Test changing and deleting methods + write(module_file, + """ + module ReviseDistributed + + f() = 3.0 + + end + """) + Revise.revise() + for p in procs() + # We use timedwait() here because worker updates from Revise are asynchronous + @test timedwait(() -> remotecall_fetch(ReviseDistributed.f, p) == 3.0, 10) == :ok + + @test_throws RemoteException remotecall_fetch(ReviseDistributed.g, p, 1) + end + + rmprocs(workers()) + rm_precompile("ReviseDistributed") + pop!(LOAD_PATH) +end + + # Run topology tests last after removing all workers, since a given # cluster at any time only supports a single topology. if nprocs() > 1 From 468fcc01b2009b78e24f1446e2cfbb57543102f7 Mon Sep 17 00:00:00 2001 From: JamesWrigley Date: Thu, 2 Jan 2025 23:11:27 +0100 Subject: [PATCH 5/9] Clean up CI a bit --- .github/workflows/ci.yml | 17 +++-------------- 1 file changed, 3 insertions(+), 14 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 0b8d4cb..72a24aa 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -39,21 +39,11 @@ jobs: steps: - uses: actions/checkout@v4 - - uses: julia-actions/setup-julia@v2 + - uses: julia-actions/setup-julia@latest with: version: ${{ matrix.version }} arch: ${{ matrix.arch }} - - uses: actions/cache@v4 - env: - cache-name: cache-artifacts - with: - path: ~/.julia/artifacts - key: ${{ runner.os }}-test-${{ env.cache-name }}-${{ hashFiles('**/Project.toml') }} - restore-keys: | - ${{ runner.os }}-test-${{ env.cache-name }}- - ${{ runner.os }}-test-${{ matrix.os }} - ${{ runner.os }}- - - uses: julia-actions/julia-buildpkg@v1 + - uses: julia-actions/cache@v2 - uses: julia-actions/julia-runtest@v1 env: JULIA_NUM_THREADS: 4 @@ -68,8 +58,7 @@ jobs: steps: - uses: actions/checkout@v4 - uses: julia-actions/setup-julia@latest - with: - version: '1' + - uses: julia-actions/cache@v2 - name: Install dependencies run: julia --project=docs/ -e 'using Pkg; Pkg.instantiate()' - name: Build and deploy From 0d5aaa39b496c9315905584d087f54e2ea7fbc02 Mon Sep 17 00:00:00 2001 From: JamesWrigley Date: Fri, 3 Jan 2025 00:13:59 +0100 Subject: [PATCH 6/9] Replace a timeout task with timedwait() This should fix an exception seen in CI from the lingering timeout task: ``` Test Summary: | Pass Total Time Deserialization error recovery and include() | 11 11 3.9s From worker 4: Unhandled Task ERROR: EOFError: read end of file From worker 4: Stacktrace: From worker 4: [1] wait From worker 4: @ .\asyncevent.jl:159 [inlined] From worker 4: [2] sleep(sec::Float64) From worker 4: @ Base .\asyncevent.jl:265 From worker 4: [3] (::DistributedNext.var"#34#37"{DistributedNext.Worker, Float64})() From worker 4: @ DistributedNext D:\a\DistributedNext.jl\DistributedNext.jl\src\cluster.jl:213 ``` --- src/cluster.jl | 14 ++++---------- 1 file changed, 4 insertions(+), 10 deletions(-) diff --git a/src/cluster.jl b/src/cluster.jl index c0d616e..9a29114 100644 --- a/src/cluster.jl +++ b/src/cluster.jl @@ -209,16 +209,10 @@ function wait_for_conn(w) timeout = worker_timeout() - (time() - w.ct_time) timeout <= 0 && error("peer $(w.id) has not connected to $(myid())") - T = Threads.@spawn begin - sleep($timeout) - lock(w.c_state) do - notify(w.c_state; all=true) - end - end - errormonitor(T) - lock(w.c_state) do - wait(w.c_state) - (@atomic w.state) === WorkerState_created && error("peer $(w.id) didn't connect to $(myid()) within $timeout seconds") + if timedwait(() -> (@atomic w.state) === WorkerState_connected, timeout) === :timed_out + # Notify any waiters on the state and throw + @lock w.c_state notify(w.c_state) + error("peer $(w.id) didn't connect to $(myid()) within $timeout seconds") end end nothing From ecd83b24b17d48fbee7eb13475e118a7d673295e Mon Sep 17 00:00:00 2001 From: JamesWrigley Date: Fri, 3 Jan 2025 00:36:53 +0100 Subject: [PATCH 7/9] fixup! Add support for worker state callbacks --- test/distributed_exec.jl | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/test/distributed_exec.jl b/test/distributed_exec.jl index 3e0dfe0..ee81e7f 100644 --- a/test/distributed_exec.jl +++ b/test/distributed_exec.jl @@ -1995,7 +1995,7 @@ end # Test that workers that were killed forcefully are detected as such exit_state = nothing - DistributedNext.add_worker_exited_callback((pid, state) -> exit_state = state) + exited_key = DistributedNext.add_worker_exited_callback((pid, state) -> exit_state = state) pid = only(addprocs(1)) redirect_stderr(devnull) do @@ -2003,6 +2003,7 @@ end timedwait(() -> !isnothing(exit_state), 10) end @test exit_state == DistributedNext.WorkerState_exterminated + DistributedNext.remove_worker_exited_callback(exited_key) end # This is a simplified copy of a test from Revise.jl's tests From e23c490116d731fa2a1c96817d28aa0db33bb058 Mon Sep 17 00:00:00 2001 From: JamesWrigley Date: Fri, 3 Jan 2025 00:43:20 +0100 Subject: [PATCH 8/9] fixup! Add an extension to support Revise --- test/distributed_exec.jl | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/test/distributed_exec.jl b/test/distributed_exec.jl index ee81e7f..a235a16 100644 --- a/test/distributed_exec.jl +++ b/test/distributed_exec.jl @@ -1915,7 +1915,9 @@ include("splitrange.jl") @testset "Clear all workers for timeout tests (issue #45785)" begin nprocs() > 1 && rmprocs(workers()) - begin + + # This test requires kill(), and that doesn't work on Windows before 1.11 + if !(Sys.iswindows() && VERSION < v"1.11") # First, assert that we get no messages when we close a cooperative worker w = only(addprocs(1)) @test_nowarn begin @@ -1933,6 +1935,8 @@ include("splitrange.jl") end wait(rmprocs([w])) end + else + @warn "Skipping timeout tests because kill() isn't supported on Windows for this Julia version" end end From 64aba0095e2178011fe33dc8b5988f0dea701dc5 Mon Sep 17 00:00:00 2001 From: JamesWrigley Date: Sun, 5 Jan 2025 16:08:09 +0100 Subject: [PATCH 9/9] Add support for worker statuses --- docs/src/_changelog.md | 2 ++ docs/src/index.md | 2 ++ src/cluster.jl | 71 +++++++++++++++++++++++++++++++++++++--- test/distributed_exec.jl | 32 +++++++++++++++--- 4 files changed, 97 insertions(+), 10 deletions(-) diff --git a/docs/src/_changelog.md b/docs/src/_changelog.md index b297e3e..304a1f0 100644 --- a/docs/src/_changelog.md +++ b/docs/src/_changelog.md @@ -20,6 +20,8 @@ This documents notable changes in DistributedNext.jl. The format is based on exported ([#18]). - Implemented callback support for workers being added/removed etc ([#17]). - Added a package extension to support Revise.jl ([#17]). +- Added support for setting worker statuses with [`setstatus`](@ref) and + [`getstatus`](@ref) ([#17]). ## [v1.0.0] - 2024-12-02 diff --git a/docs/src/index.md b/docs/src/index.md index e0cf6cf..c3cc5b1 100644 --- a/docs/src/index.md +++ b/docs/src/index.md @@ -14,6 +14,8 @@ DistributedNext.rmprocs DistributedNext.interrupt DistributedNext.myid DistributedNext.pmap +DistributedNext.getstatus +DistributedNext.setstatus DistributedNext.RemoteException DistributedNext.ProcessExitedException DistributedNext.Future diff --git a/src/cluster.jl b/src/cluster.jl index 9a29114..b1d9c7a 100644 --- a/src/cluster.jl +++ b/src/cluster.jl @@ -870,6 +870,8 @@ const LPROC = LocalProcess() const LPROCROLE = Ref{Symbol}(:master) const HDR_VERSION_LEN=16 const HDR_COOKIE_LEN=16 +const map_pid_statuses = Dict{Int, Any}() +const map_pid_statuses_lock = ReentrantLock() const map_pid_wrkr = Dict{Int, Union{Worker, LocalProcess}}() const map_sock_wrkr = IdDict() const map_del_wrkr = Set{Int}() @@ -1010,15 +1012,16 @@ for any reason (i.e. not only because of [`rmprocs()`](@ref) but also the worker segfaulting etc). Chooses and returns a unique key for the callback if `key` is not specified. -The callback will be called with the worker ID and the final -`Distributed.WorkerState` of the worker, e.g. `f(w::Int, state)`. `state` is an +The callback will be called with the worker ID, the final +`Distributed.WorkerState` of the worker, and the last status of the worker as +set by [`setstatus`](@ref), e.g. `f(w::Int, state, status)`. `state` is an enum, a value of `WorkerState_terminated` means a graceful exit and a value of `WorkerState_exterminated` means the worker died unexpectedly. If the callback throws an exception it will be caught and printed. """ add_worker_exited_callback(f::Base.Callable; key=nothing) = _add_callback(f, key, worker_exited_callbacks; - arg_types=Tuple{Int, WorkerState}) + arg_types=Tuple{Int, WorkerState, Any}) """ remove_worker_exited_callback(key) @@ -1206,6 +1209,59 @@ Identical to [`workers()`](@ref) except that the current worker is filtered out. """ other_workers() = filter(!=(myid()), workers()) +""" + setstatus(x, pid::Int=myid()) + +Set the status for worker `pid` to `x`. `x` may be any serializable object but +it's recommended to keep it small enough to cheaply send over a network. The +status will be passed to the worker-exited callbacks (see +[`add_worker_exited_callback`](@ref)) when the worker exits. + +This can be handy if you want a way to know what a worker is doing at any given +time, or (in combination with a worker-exited callback) for knowing what a +worker was last doing before it died. + +# Examples +```julia-repl +julia> DistributedNext.setstatus("working on dataset 42") +"working on dataset 42" + +julia> DistributedNext.getstatus() +"working on dataset 42" +``` +""" +function setstatus(x, pid::Int=myid()) + if pid ∉ procs() + throw(ArgumentError("Worker $(pid) does not exist, cannot set its status")) + end + + if myid() == 1 + @lock map_pid_statuses_lock map_pid_statuses[pid] = x + else + remotecall_fetch(setstatus, 1, x, myid()) + end +end + +_getstatus(pid) = @lock map_pid_statuses_lock get!(map_pid_statuses, pid, nothing) + +""" + getstatus(pid::Int=myid()) + +Get the status for worker `pid`. If one was never explicitly set with +[`setstatus`](@ref) this will return `nothing`. +""" +function getstatus(pid::Int=myid()) + if pid ∉ procs() + throw(ArgumentError("Worker $(pid) does not exist, cannot get its status")) + end + + if myid() == 1 + _getstatus(pid) + else + remotecall_fetch(getstatus, 1, pid) + end +end + function cluster_mgmt_from_master_check() if myid() != 1 throw(ErrorException("Only process 1 can add and remove workers")) @@ -1425,15 +1481,20 @@ function deregister_worker(pg, pid) end end - # Call callbacks on the master if myid() == 1 + status = _getstatus(pid) + + # Call callbacks on the master for (name, callback) in worker_exited_callbacks try - callback(pid, w.state) + callback(pid, w.state, status) catch ex @error "Error when running worker-exited callback '$(name)'" exception=(ex, catch_backtrace()) end end + + # Delete its status + @lock map_pid_statuses_lock delete!(map_pid_statuses, pid) end return diff --git a/test/distributed_exec.jl b/test/distributed_exec.jl index a235a16..b15c426 100644 --- a/test/distributed_exec.jl +++ b/test/distributed_exec.jl @@ -3,7 +3,7 @@ import Revise using DistributedNext, Random, Serialization, Sockets import DistributedNext -import DistributedNext: launch, manage +import DistributedNext: launch, manage, getstatus, setstatus @test cluster_cookie() isa String @@ -1940,6 +1940,24 @@ include("splitrange.jl") end end +@testset "Worker statuses" begin + rmprocs(other_workers()) + + # Test with the local worker + @test isnothing(getstatus()) + setstatus("foo") + @test getstatus() == "foo" + @test_throws ArgumentError getstatus(2) + + # Test with a remote worker + pid = only(addprocs(1)) + @test isnothing(getstatus(pid)) + remotecall_wait(setstatus, pid, "bar", pid) + @test remotecall_fetch(getstatus, pid) == "bar" + + rmprocs(pid) +end + @testset "Worker state callbacks" begin rmprocs(other_workers()) @@ -1954,7 +1972,7 @@ end starting_key = DistributedNext.add_worker_starting_callback((manager, kwargs) -> push!(starting_managers, manager)) started_key = DistributedNext.add_worker_started_callback(pid -> (push!(started_workers, pid); error("foo"))) exiting_key = DistributedNext.add_worker_exiting_callback(pid -> push!(exiting_workers, pid)) - exited_key = DistributedNext.add_worker_exited_callback((pid, state) -> push!(exited_workers, (pid, state))) + exited_key = DistributedNext.add_worker_exited_callback((pid, state, status) -> push!(exited_workers, (pid, state, status))) # Test that the worker-started exception bubbles up @test_throws TaskFailedException addprocs(1) @@ -1964,7 +1982,7 @@ end @test started_workers == [pid] rmprocs(workers()) @test exiting_workers == [pid] - @test exited_workers == [(pid, DistributedNext.WorkerState_terminated)] + @test exited_workers == [(pid, DistributedNext.WorkerState_terminated, nothing)] # Trying to reset an existing callback should fail @test_throws ArgumentError DistributedNext.add_worker_started_callback(Returns(nothing); key=started_key) @@ -1997,16 +2015,20 @@ end @test length(exiting_workers) == 1 @test length(exited_workers) == 1 - # Test that workers that were killed forcefully are detected as such + # Test that workers that were killed forcefully are detected as such, and + # that statuses are passed properly. exit_state = nothing - exited_key = DistributedNext.add_worker_exited_callback((pid, state) -> exit_state = state) + last_status = nothing + exited_key = DistributedNext.add_worker_exited_callback((pid, state, status) -> (exit_state = state; last_status = status)) pid = only(addprocs(1)) + setstatus("foo", pid) redirect_stderr(devnull) do remote_do(exit, pid) timedwait(() -> !isnothing(exit_state), 10) end @test exit_state == DistributedNext.WorkerState_exterminated + @test last_status == "foo" DistributedNext.remove_worker_exited_callback(exited_key) end