Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add SLURM support #43

Merged
merged 3 commits into from
Apr 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion Project.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,15 @@ uuid = "9feedd95-f0e0-423f-a8dc-de0970eae6b3"
version = "0.2.7"

[deps]
ClusterManagers = "34f1f09b-3a8b-5176-ab39-66d58a4d544e"
Dates = "ade2ca70-3891-5945-98fb-dc099432e06a"
Distributed = "8ba89e20-285c-5b6f-9357-94700520ee1b"
Format = "1fa38f19-a742-5d3f-a2b9-30dd87b9d5f8"
Glob = "c27321d9-0574-5035-807b-f59d2c89b15c"
IntervalSets = "8197267c-284f-5f27-9208-e0e47529a953"
JSON = "682c06a0-de6a-54ab-a142-c8b1cf79cde6"
LRUCache = "8ac3fa9e-de4c-5943-b1dc-09c6b5f20637"
LinearAlgebra = "37e2e46d-f89d-539d-b4ee-838fcccc9c8e"
MIMEs = "6c6e2e6c-3030-632d-7369-2d6c69616d65"
Markdown = "d6f4376e-aef5-505a-96c1-9c027394607a"
Measurements = "eff96d63-e80a-5855-80a2-b1b0885c5ab7"
Expand All @@ -23,6 +25,7 @@ RecipesBase = "3cdcf5f2-1ef4-517c-9805-6587b60abb01"
StaticStrings = "4db0a0c5-418a-4e1d-8806-cb305fe13294"
StructArrays = "09ab397b-f2b6-538f-b94a-2f83cf4a842a"
Tables = "bd369af6-aec1-5ad0-b16a-f7cc5008161c"
ThreadPinning = "811555cd-349b-4f26-b7bc-1f208b848042"
TypedTables = "9d95f2ec-7b3d-5a63-8d20-e2491e220bb9"
UUIDs = "cf7118a7-6976-5b1a-9a39-7adc72f591a4"
Unitful = "1986cc42-f94f-5a68-af5c-568840ba703d"
Expand All @@ -36,13 +39,15 @@ LegendDataManagementSolidStateDetectorsExt = "SolidStateDetectors"
LegendDataManagementRecipesBaseExt = "RecipesBase"

[compat]
ClusterManagers = "0.4.5"
Dates = "1"
Distributed = "1"
Format = "1"
Glob = "1"
IntervalSets = "0.6, 0.7"
JSON = "0.21, 1"
LRUCache = "1.5"
LinearAlgebra = "1"
MIMEs = "0.1"
Markdown = "1"
Measurements = "2"
Expand All @@ -57,7 +62,8 @@ SolidStateDetectors = "0.8, 0.9"
StaticStrings = "0.2"
StructArrays = "0.4, 0.5, 0.6"
Tables = "0.2, 1.0"
ThreadPinning = "0.7"
TypedTables = "1.4"
UUIDs = "1"
Unitful = "1"
julia = "1.9"
julia = "1.9"
5 changes: 5 additions & 0 deletions src/LegendDataManagement.jl
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ using Dates
using UUIDs

import Distributed
import LinearAlgebra
import Pkg

using Glob
Expand All @@ -32,6 +33,9 @@ using TypedTables
import Markdown
using MIMEs: mime_from_extension

import ThreadPinning
import ClusterManagers

include("legend_report.jl")
include("status_types.jl")
include("atomic_fcreate.jl")
Expand All @@ -50,5 +54,6 @@ include("calibration_functions.jl")
include("lprops.jl")
include("utils/utils.jl")

include("slurm.jl")

end # module
171 changes: 171 additions & 0 deletions src/slurm.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
# ClusterManager for Slurm

# This code is a modified version of ClusterManagers.SlurmManager, both
# original code and modifications are licensed under the MIT License (MIT):
# https://github.com/JuliaParallel/ClusterManagers.jl/blob/master/LICENSE.md

# Modifications are to be upstreamed are planned to be upstreamed as far
# as possible, once proved in the field.


# ==================================================================
using Base: ExponentialBackOff
using Distributed: ClusterManager, WorkerConfig
using ClusterManagers: worker_arg

import Distributed: launch, manage
# ==================================================================


struct SlurmManager <: ClusterManager
np::Integer
retry_delays
end

struct SlurmException <: Exception
msg
end

function launch(manager::SlurmManager, params::Dict, instances_arr::Array,

Check warning on line 29 in src/slurm.jl

View check run for this annotation

Codecov / codecov/patch

src/slurm.jl#L29

Added line #L29 was not covered by tests
c::Condition)

try
exehome = params[:dir]
exename = params[:exename]
exeflags = params[:exeflags]

Check warning on line 35 in src/slurm.jl

View check run for this annotation

Codecov / codecov/patch

src/slurm.jl#L32-L35

Added lines #L32 - L35 were not covered by tests

stdkeys = keys(Distributed.default_addprocs_params())

Check warning on line 37 in src/slurm.jl

View check run for this annotation

Codecov / codecov/patch

src/slurm.jl#L37

Added line #L37 was not covered by tests

p = filter(x->(!(x[1] in stdkeys) && x[1] != :job_file_loc), params)

Check warning on line 39 in src/slurm.jl

View check run for this annotation

Codecov / codecov/patch

src/slurm.jl#L39

Added line #L39 was not covered by tests

srunargs = []
for k in keys(p)
if length(string(k)) == 1
push!(srunargs, "-$k")
val = p[k]
if length(val) > 0
push!(srunargs, "$(p[k])")

Check warning on line 47 in src/slurm.jl

View check run for this annotation

Codecov / codecov/patch

src/slurm.jl#L41-L47

Added lines #L41 - L47 were not covered by tests
end
else
k2 = replace(string(k), "_"=>"-")
val = p[k]
if length(val) > 0
push!(srunargs, "--$(k2)=$(p[k])")

Check warning on line 53 in src/slurm.jl

View check run for this annotation

Codecov / codecov/patch

src/slurm.jl#L50-L53

Added lines #L50 - L53 were not covered by tests
else
push!(srunargs, "--$(k2)")

Check warning on line 55 in src/slurm.jl

View check run for this annotation

Codecov / codecov/patch

src/slurm.jl#L55

Added line #L55 was not covered by tests
end
end
end

Check warning on line 58 in src/slurm.jl

View check run for this annotation

Codecov / codecov/patch

src/slurm.jl#L58

Added line #L58 was not covered by tests

# Get job file location from parameter dictionary.
job_file_loc = joinpath(exehome, get(params, :job_file_loc, "."))

Check warning on line 61 in src/slurm.jl

View check run for this annotation

Codecov / codecov/patch

src/slurm.jl#L61

Added line #L61 was not covered by tests

# Make directory if not already made.
if !isdir(job_file_loc)
mkdir(job_file_loc)

Check warning on line 65 in src/slurm.jl

View check run for this annotation

Codecov / codecov/patch

src/slurm.jl#L64-L65

Added lines #L64 - L65 were not covered by tests
end

# Check for given output file name
jobname = "julia-$(getpid())"
has_output_name = ("-o" in srunargs) | ("--output" in srunargs)
if has_output_name
loc = findfirst(x-> x == "-o" || x == "--output", srunargs)
job_output_name = srunargs[loc+1]
job_output_template = joinpath(job_file_loc, job_output_name)
srunargs[loc+1] = job_output_template

Check warning on line 75 in src/slurm.jl

View check run for this annotation

Codecov / codecov/patch

src/slurm.jl#L69-L75

Added lines #L69 - L75 were not covered by tests
else
job_output_name = "$(jobname)-$(trunc(Int, Base.time() * 10))"
make_job_output_path(task_num) = joinpath(job_file_loc, "$(job_output_name)-$(task_num).out")
job_output_template = make_job_output_path("%4t")
push!(srunargs, "-o", job_output_template)

Check warning on line 80 in src/slurm.jl

View check run for this annotation

Codecov / codecov/patch

src/slurm.jl#L77-L80

Added lines #L77 - L80 were not covered by tests
end

np = manager.np
srun_cmd = `srun -J $jobname -n $np -D $exehome $(srunargs) $exename $exeflags $(worker_arg())`

Check warning on line 84 in src/slurm.jl

View check run for this annotation

Codecov / codecov/patch

src/slurm.jl#L83-L84

Added lines #L83 - L84 were not covered by tests

global g_state = (;manager, params, instances_arr, c, srun_cmd, has_output_name, job_output_name, make_job_output_path, job_output_template)

Check warning on line 86 in src/slurm.jl

View check run for this annotation

Codecov / codecov/patch

src/slurm.jl#L86

Added line #L86 was not covered by tests

@info "Starting SLURM job $jobname: $srun_cmd"
srun_proc = open(srun_cmd)

Check warning on line 89 in src/slurm.jl

View check run for this annotation

Codecov / codecov/patch

src/slurm.jl#L88-L89

Added lines #L88 - L89 were not covered by tests

slurm_spec_regex = r"([\w]+):([\d]+)#(\d{1,3}.\d{1,3}.\d{1,3}.\d{1,3})"
could_not_connect_regex = r"could not connect"
exiting_regex = r"exiting."
retry_delays = manager.retry_delays

Check warning on line 94 in src/slurm.jl

View check run for this annotation

Codecov / codecov/patch

src/slurm.jl#L91-L94

Added lines #L91 - L94 were not covered by tests

t_start = time()
t_waited = round(Int, time() - t_start)
for i = 0:np - 1
slurm_spec_match::Union{RegexMatch,Nothing} = nothing
worker_errors = String[]
if has_output_name
fn = job_output_template

Check warning on line 102 in src/slurm.jl

View check run for this annotation

Codecov / codecov/patch

src/slurm.jl#L96-L102

Added lines #L96 - L102 were not covered by tests
else
fn = make_job_output_path(lpad(i, 4, "0"))

Check warning on line 104 in src/slurm.jl

View check run for this annotation

Codecov / codecov/patch

src/slurm.jl#L104

Added line #L104 was not covered by tests
end
for retry_delay in push!(collect(retry_delays), 0)
t_waited = round(Int, time() - t_start)

Check warning on line 107 in src/slurm.jl

View check run for this annotation

Codecov / codecov/patch

src/slurm.jl#L106-L107

Added lines #L106 - L107 were not covered by tests

# Wait for output log to be created and populated, then parse

if isfile(fn)
if filesize(fn) > 0
open(fn) do f

Check warning on line 113 in src/slurm.jl

View check run for this annotation

Codecov / codecov/patch

src/slurm.jl#L111-L113

Added lines #L111 - L113 were not covered by tests
# Due to error and warning messages, the specification
# may not appear on the file's first line
for line in eachline(f)
re_match = match(slurm_spec_regex, line)
if !isnothing(re_match)
slurm_spec_match = re_match

Check warning on line 119 in src/slurm.jl

View check run for this annotation

Codecov / codecov/patch

src/slurm.jl#L116-L119

Added lines #L116 - L119 were not covered by tests
end
for expr in [could_not_connect_regex, exiting_regex]
if !isnothing(match(expr, line))
slurm_spec_match = nothing
push!(worker_errors, line)

Check warning on line 124 in src/slurm.jl

View check run for this annotation

Codecov / codecov/patch

src/slurm.jl#L121-L124

Added lines #L121 - L124 were not covered by tests
end
end
end

Check warning on line 127 in src/slurm.jl

View check run for this annotation

Codecov / codecov/patch

src/slurm.jl#L126-L127

Added lines #L126 - L127 were not covered by tests
end
end
if !isempty(worker_errors) || !isnothing(slurm_spec_match)
break # break if error or specification found

Check warning on line 131 in src/slurm.jl

View check run for this annotation

Codecov / codecov/patch

src/slurm.jl#L130-L131

Added lines #L130 - L131 were not covered by tests
else
@info "Worker $i (after $t_waited s): Output file found, but no connection details yet"

Check warning on line 133 in src/slurm.jl

View check run for this annotation

Codecov / codecov/patch

src/slurm.jl#L133

Added line #L133 was not covered by tests
end
else
@info "Worker $i (after $t_waited s): No output file \"$fn\" yet"

Check warning on line 136 in src/slurm.jl

View check run for this annotation

Codecov / codecov/patch

src/slurm.jl#L136

Added line #L136 was not covered by tests
end

# Sleep for some time to limit ressource usage while waiting for the job to start
sleep(retry_delay)
end

Check warning on line 141 in src/slurm.jl

View check run for this annotation

Codecov / codecov/patch

src/slurm.jl#L140-L141

Added lines #L140 - L141 were not covered by tests

if !isempty(worker_errors)
throw(SlurmException("Worker $i failed after $t_waited s: $(join(worker_errors, " "))"))
elseif isnothing(slurm_spec_match)
throw(SlurmException("Timeout after $t_waited s while waiting for worker $i to get ready."))

Check warning on line 146 in src/slurm.jl

View check run for this annotation

Codecov / codecov/patch

src/slurm.jl#L143-L146

Added lines #L143 - L146 were not covered by tests
end

config = WorkerConfig()
config.port = parse(Int, slurm_spec_match[2])
config.host = strip(slurm_spec_match[3])
@info "Worker $i ready after $t_waited s on host $(config.host), port $(config.port)"

Check warning on line 152 in src/slurm.jl

View check run for this annotation

Codecov / codecov/patch

src/slurm.jl#L149-L152

Added lines #L149 - L152 were not covered by tests
# Keep a reference to the proc, so it's properly closed once
# the last worker exits.
config.userdata = srun_proc
push!(instances_arr, config)
notify(c)
end

Check warning on line 158 in src/slurm.jl

View check run for this annotation

Codecov / codecov/patch

src/slurm.jl#L155-L158

Added lines #L155 - L158 were not covered by tests
catch e
println("Error launching Slurm job:")
rethrow(e)

Check warning on line 161 in src/slurm.jl

View check run for this annotation

Codecov / codecov/patch

src/slurm.jl#L160-L161

Added lines #L160 - L161 were not covered by tests
end
end

function manage(manager::SlurmManager, id::Integer, config::WorkerConfig,
op::Symbol)

Check warning on line 166 in src/slurm.jl

View check run for this annotation

Codecov / codecov/patch

src/slurm.jl#L165-L166

Added lines #L165 - L166 were not covered by tests
# This function needs to exist, but so far we don't do anything
end

SlurmManager(np::Integer) = SlurmManager(np, ExponentialBackOff(n=10, first_delay=1,

Check warning on line 170 in src/slurm.jl

View check run for this annotation

Codecov / codecov/patch

src/slurm.jl#L170

Added line #L170 was not covered by tests
max_delay=512, factor=2))
Loading
Loading