Skip to content

Commit

Permalink
Use modified SlurmManager
Browse files Browse the repository at this point in the history
  • Loading branch information
oschulz committed Apr 14, 2024
1 parent d4f3ba7 commit fb3c66b
Show file tree
Hide file tree
Showing 3 changed files with 173 additions and 1 deletion.
1 change: 1 addition & 0 deletions src/LegendDataManagement.jl
Original file line number Diff line number Diff line change
Expand Up @@ -54,5 +54,6 @@ include("calibration_functions.jl")
include("lprops.jl")
include("utils/utils.jl")

include("slurm.jl")

end # module
171 changes: 171 additions & 0 deletions src/slurm.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
# ClusterManager for Slurm

# This code is a modified version of ClusterManagers.SlurmManager, both
# original code and modifications are licensed under the MIT License (MIT):
# https://github.com/JuliaParallel/ClusterManagers.jl/blob/master/LICENSE.md

# Modifications are to be upstreamed are planned to be upstreamed as far
# as possible, once proved in the field.


# ==================================================================
using Base: ExponentialBackOff
using Distributed: ClusterManager, WorkerConfig
using ClusterManagers: worker_arg

import Distributed: launch, manage
# ==================================================================


struct SlurmManager <: ClusterManager
np::Integer
retry_delays
end

struct SlurmException <: Exception
msg
end

function launch(manager::SlurmManager, params::Dict, instances_arr::Array,

Check warning on line 29 in src/slurm.jl

View check run for this annotation

Codecov / codecov/patch

src/slurm.jl#L29

Added line #L29 was not covered by tests
c::Condition)

try
exehome = params[:dir]
exename = params[:exename]
exeflags = params[:exeflags]

Check warning on line 35 in src/slurm.jl

View check run for this annotation

Codecov / codecov/patch

src/slurm.jl#L32-L35

Added lines #L32 - L35 were not covered by tests

stdkeys = keys(Distributed.default_addprocs_params())

Check warning on line 37 in src/slurm.jl

View check run for this annotation

Codecov / codecov/patch

src/slurm.jl#L37

Added line #L37 was not covered by tests

p = filter(x->(!(x[1] in stdkeys) && x[1] != :job_file_loc), params)

Check warning on line 39 in src/slurm.jl

View check run for this annotation

Codecov / codecov/patch

src/slurm.jl#L39

Added line #L39 was not covered by tests

srunargs = []
for k in keys(p)
if length(string(k)) == 1
push!(srunargs, "-$k")
val = p[k]
if length(val) > 0
push!(srunargs, "$(p[k])")

Check warning on line 47 in src/slurm.jl

View check run for this annotation

Codecov / codecov/patch

src/slurm.jl#L41-L47

Added lines #L41 - L47 were not covered by tests
end
else
k2 = replace(string(k), "_"=>"-")
val = p[k]
if length(val) > 0
push!(srunargs, "--$(k2)=$(p[k])")

Check warning on line 53 in src/slurm.jl

View check run for this annotation

Codecov / codecov/patch

src/slurm.jl#L50-L53

Added lines #L50 - L53 were not covered by tests
else
push!(srunargs, "--$(k2)")

Check warning on line 55 in src/slurm.jl

View check run for this annotation

Codecov / codecov/patch

src/slurm.jl#L55

Added line #L55 was not covered by tests
end
end
end

Check warning on line 58 in src/slurm.jl

View check run for this annotation

Codecov / codecov/patch

src/slurm.jl#L58

Added line #L58 was not covered by tests

# Get job file location from parameter dictionary.
job_file_loc = joinpath(exehome, get(params, :job_file_loc, "."))

Check warning on line 61 in src/slurm.jl

View check run for this annotation

Codecov / codecov/patch

src/slurm.jl#L61

Added line #L61 was not covered by tests

# Make directory if not already made.
if !isdir(job_file_loc)
mkdir(job_file_loc)

Check warning on line 65 in src/slurm.jl

View check run for this annotation

Codecov / codecov/patch

src/slurm.jl#L64-L65

Added lines #L64 - L65 were not covered by tests
end

# Check for given output file name
jobname = "julia-$(getpid())"
has_output_name = ("-o" in srunargs) | ("--output" in srunargs)
if has_output_name
loc = findfirst(x-> x == "-o" || x == "--output", srunargs)
job_output_name = srunargs[loc+1]
job_output_template = joinpath(job_file_loc, job_output_name)
srunargs[loc+1] = job_output_template

Check warning on line 75 in src/slurm.jl

View check run for this annotation

Codecov / codecov/patch

src/slurm.jl#L69-L75

Added lines #L69 - L75 were not covered by tests
else
job_output_name = "$(jobname)-$(trunc(Int, Base.time() * 10))"
make_job_output_path(task_num) = joinpath(job_file_loc, "$(job_output_name)-$(task_num).out")
job_output_template = make_job_output_path("%4t")
push!(srunargs, "-o", job_output_template)

Check warning on line 80 in src/slurm.jl

View check run for this annotation

Codecov / codecov/patch

src/slurm.jl#L77-L80

Added lines #L77 - L80 were not covered by tests
end

np = manager.np
srun_cmd = `srun -J $jobname -n $np -D $exehome $(srunargs) $exename $exeflags $(worker_arg())`

Check warning on line 84 in src/slurm.jl

View check run for this annotation

Codecov / codecov/patch

src/slurm.jl#L83-L84

Added lines #L83 - L84 were not covered by tests

global g_state = (;manager, params, instances_arr, c, srun_cmd, has_output_name, job_output_name, make_job_output_path, job_output_template)

Check warning on line 86 in src/slurm.jl

View check run for this annotation

Codecov / codecov/patch

src/slurm.jl#L86

Added line #L86 was not covered by tests

@info "Starting SLURM job $jobname: $srun_cmd"
srun_proc = open(srun_cmd)

Check warning on line 89 in src/slurm.jl

View check run for this annotation

Codecov / codecov/patch

src/slurm.jl#L88-L89

Added lines #L88 - L89 were not covered by tests

slurm_spec_regex = r"([\w]+):([\d]+)#(\d{1,3}.\d{1,3}.\d{1,3}.\d{1,3})"
could_not_connect_regex = r"could not connect"
exiting_regex = r"exiting."
retry_delays = manager.retry_delays

Check warning on line 94 in src/slurm.jl

View check run for this annotation

Codecov / codecov/patch

src/slurm.jl#L91-L94

Added lines #L91 - L94 were not covered by tests

t_start = time()
t_waited = round(Int, time() - t_start)
for i = 0:np - 1
slurm_spec_match::Union{RegexMatch,Nothing} = nothing
worker_errors = String[]
if has_output_name
fn = job_output_template

Check warning on line 102 in src/slurm.jl

View check run for this annotation

Codecov / codecov/patch

src/slurm.jl#L96-L102

Added lines #L96 - L102 were not covered by tests
else
fn = make_job_output_path(lpad(i, 4, "0"))

Check warning on line 104 in src/slurm.jl

View check run for this annotation

Codecov / codecov/patch

src/slurm.jl#L104

Added line #L104 was not covered by tests
end
for retry_delay in push!(collect(retry_delays), 0)
t_waited = round(Int, time() - t_start)

Check warning on line 107 in src/slurm.jl

View check run for this annotation

Codecov / codecov/patch

src/slurm.jl#L106-L107

Added lines #L106 - L107 were not covered by tests

# Wait for output log to be created and populated, then parse

if isfile(fn)
if filesize(fn) > 0
open(fn) do f

Check warning on line 113 in src/slurm.jl

View check run for this annotation

Codecov / codecov/patch

src/slurm.jl#L111-L113

Added lines #L111 - L113 were not covered by tests
# Due to error and warning messages, the specification
# may not appear on the file's first line
for line in eachline(f)
re_match = match(slurm_spec_regex, line)
if !isnothing(re_match)
slurm_spec_match = re_match

Check warning on line 119 in src/slurm.jl

View check run for this annotation

Codecov / codecov/patch

src/slurm.jl#L116-L119

Added lines #L116 - L119 were not covered by tests
end
for expr in [could_not_connect_regex, exiting_regex]
if !isnothing(match(expr, line))
slurm_spec_match = nothing
push!(worker_errors, line)

Check warning on line 124 in src/slurm.jl

View check run for this annotation

Codecov / codecov/patch

src/slurm.jl#L121-L124

Added lines #L121 - L124 were not covered by tests
end
end
end

Check warning on line 127 in src/slurm.jl

View check run for this annotation

Codecov / codecov/patch

src/slurm.jl#L126-L127

Added lines #L126 - L127 were not covered by tests
end
end
if !isempty(worker_errors) || !isnothing(slurm_spec_match)
break # break if error or specification found

Check warning on line 131 in src/slurm.jl

View check run for this annotation

Codecov / codecov/patch

src/slurm.jl#L130-L131

Added lines #L130 - L131 were not covered by tests
else
@info "Worker $i (after $t_waited s): Output file found, but no connection details yet"

Check warning on line 133 in src/slurm.jl

View check run for this annotation

Codecov / codecov/patch

src/slurm.jl#L133

Added line #L133 was not covered by tests
end
else
@info "Worker $i (after $t_waited s): No output file \"$fn\" yet"

Check warning on line 136 in src/slurm.jl

View check run for this annotation

Codecov / codecov/patch

src/slurm.jl#L136

Added line #L136 was not covered by tests
end

# Sleep for some time to limit ressource usage while waiting for the job to start
sleep(retry_delay)
end

Check warning on line 141 in src/slurm.jl

View check run for this annotation

Codecov / codecov/patch

src/slurm.jl#L140-L141

Added lines #L140 - L141 were not covered by tests

if !isempty(worker_errors)
throw(SlurmException("Worker $i failed after $t_waited s: $(join(worker_errors, " "))"))
elseif isnothing(slurm_spec_match)
throw(SlurmException("Timeout after $t_waited s while waiting for worker $i to get ready."))

Check warning on line 146 in src/slurm.jl

View check run for this annotation

Codecov / codecov/patch

src/slurm.jl#L143-L146

Added lines #L143 - L146 were not covered by tests
end

config = WorkerConfig()
config.port = parse(Int, slurm_spec_match[2])
config.host = strip(slurm_spec_match[3])
@info "Worker $i ready after $t_waited s on host $(config.host), port $(config.port)"

Check warning on line 152 in src/slurm.jl

View check run for this annotation

Codecov / codecov/patch

src/slurm.jl#L149-L152

Added lines #L149 - L152 were not covered by tests
# Keep a reference to the proc, so it's properly closed once
# the last worker exits.
config.userdata = srun_proc
push!(instances_arr, config)
notify(c)
end

Check warning on line 158 in src/slurm.jl

View check run for this annotation

Codecov / codecov/patch

src/slurm.jl#L155-L158

Added lines #L155 - L158 were not covered by tests
catch e
println("Error launching Slurm job:")
rethrow(e)

Check warning on line 161 in src/slurm.jl

View check run for this annotation

Codecov / codecov/patch

src/slurm.jl#L160-L161

Added lines #L160 - L161 were not covered by tests
end
end

function manage(manager::SlurmManager, id::Integer, config::WorkerConfig,
op::Symbol)

Check warning on line 166 in src/slurm.jl

View check run for this annotation

Codecov / codecov/patch

src/slurm.jl#L165-L166

Added lines #L165 - L166 were not covered by tests
# This function needs to exist, but so far we don't do anything
end

SlurmManager(np::Integer) = SlurmManager(np, ExponentialBackOff(n=10, first_delay=1,

Check warning on line 170 in src/slurm.jl

View check run for this annotation

Codecov / codecov/patch

src/slurm.jl#L170

Added line #L170 was not covered by tests
max_delay=512, factor=2))
2 changes: 1 addition & 1 deletion src/workers.jl
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ function _addprocs_slurm(
slurm_mem_per_cpu = parse(Int, ENV["SLURM_MEM_PER_CPU"]) * 1024^2
slurm_mem_per_task = slurm_nthreads * slurm_mem_per_cpu

Check warning on line 171 in src/workers.jl

View check run for this annotation

Codecov / codecov/patch

src/workers.jl#L167-L171

Added lines #L167 - L171 were not covered by tests

cluster_manager = ClusterManagers.SlurmManager(slurm_ntasks, retry_delays)
cluster_manager = LegendDataManagement.SlurmManager(slurm_ntasks, retry_delays)
worker_timeout = round(Int, max(sum(cluster_manager.retry_delays), 60))
ENV["JULIA_WORKER_TIMEOUT"] = "$worker_timeout"

Check warning on line 175 in src/workers.jl

View check run for this annotation

Codecov / codecov/patch

src/workers.jl#L173-L175

Added lines #L173 - L175 were not covered by tests

Expand Down

0 comments on commit fb3c66b

Please sign in to comment.