diff --git a/src/LegendDataManagement.jl b/src/LegendDataManagement.jl index 40e9a6e4..a326bc24 100644 --- a/src/LegendDataManagement.jl +++ b/src/LegendDataManagement.jl @@ -54,5 +54,6 @@ include("calibration_functions.jl") include("lprops.jl") include("utils/utils.jl") +include("slurm.jl") end # module \ No newline at end of file diff --git a/src/slurm.jl b/src/slurm.jl new file mode 100644 index 00000000..4b283b32 --- /dev/null +++ b/src/slurm.jl @@ -0,0 +1,171 @@ +# ClusterManager for Slurm + +# This code is a modified version of ClusterManagers.SlurmManager, both +# original code and modifications are licensed under the MIT License (MIT): +# https://github.com/JuliaParallel/ClusterManagers.jl/blob/master/LICENSE.md + +# Modifications are to be upstreamed are planned to be upstreamed as far +# as possible, once proved in the field. + + +# ================================================================== +using Base: ExponentialBackOff +using Distributed: ClusterManager, WorkerConfig +using ClusterManagers: worker_arg + +import Distributed: launch, manage +# ================================================================== + + +struct SlurmManager <: ClusterManager + np::Integer + retry_delays +end + +struct SlurmException <: Exception + msg +end + +function launch(manager::SlurmManager, params::Dict, instances_arr::Array, + c::Condition) + + try + exehome = params[:dir] + exename = params[:exename] + exeflags = params[:exeflags] + + stdkeys = keys(Distributed.default_addprocs_params()) + + p = filter(x->(!(x[1] in stdkeys) && x[1] != :job_file_loc), params) + + srunargs = [] + for k in keys(p) + if length(string(k)) == 1 + push!(srunargs, "-$k") + val = p[k] + if length(val) > 0 + push!(srunargs, "$(p[k])") + end + else + k2 = replace(string(k), "_"=>"-") + val = p[k] + if length(val) > 0 + push!(srunargs, "--$(k2)=$(p[k])") + else + push!(srunargs, "--$(k2)") + end + end + end + + # Get job file location from parameter dictionary. + job_file_loc = joinpath(exehome, get(params, :job_file_loc, ".")) + + # Make directory if not already made. + if !isdir(job_file_loc) + mkdir(job_file_loc) + end + + # Check for given output file name + jobname = "julia-$(getpid())" + has_output_name = ("-o" in srunargs) | ("--output" in srunargs) + if has_output_name + loc = findfirst(x-> x == "-o" || x == "--output", srunargs) + job_output_name = srunargs[loc+1] + job_output_template = joinpath(job_file_loc, job_output_name) + srunargs[loc+1] = job_output_template + else + job_output_name = "$(jobname)-$(trunc(Int, Base.time() * 10))" + make_job_output_path(task_num) = joinpath(job_file_loc, "$(job_output_name)-$(task_num).out") + job_output_template = make_job_output_path("%4t") + push!(srunargs, "-o", job_output_template) + end + + np = manager.np + srun_cmd = `srun -J $jobname -n $np -D $exehome $(srunargs) $exename $exeflags $(worker_arg())` + + global g_state = (;manager, params, instances_arr, c, srun_cmd, has_output_name, job_output_name, make_job_output_path, job_output_template) + + @info "Starting SLURM job $jobname: $srun_cmd" + srun_proc = open(srun_cmd) + + slurm_spec_regex = r"([\w]+):([\d]+)#(\d{1,3}.\d{1,3}.\d{1,3}.\d{1,3})" + could_not_connect_regex = r"could not connect" + exiting_regex = r"exiting." + retry_delays = manager.retry_delays + + t_start = time() + t_waited = round(Int, time() - t_start) + for i = 0:np - 1 + slurm_spec_match::Union{RegexMatch,Nothing} = nothing + worker_errors = String[] + if has_output_name + fn = job_output_template + else + fn = make_job_output_path(lpad(i, 4, "0")) + end + for retry_delay in push!(collect(retry_delays), 0) + t_waited = round(Int, time() - t_start) + + # Wait for output log to be created and populated, then parse + + if isfile(fn) + if filesize(fn) > 0 + open(fn) do f + # Due to error and warning messages, the specification + # may not appear on the file's first line + for line in eachline(f) + re_match = match(slurm_spec_regex, line) + if !isnothing(re_match) + slurm_spec_match = re_match + end + for expr in [could_not_connect_regex, exiting_regex] + if !isnothing(match(expr, line)) + slurm_spec_match = nothing + push!(worker_errors, line) + end + end + end + end + end + if !isempty(worker_errors) || !isnothing(slurm_spec_match) + break # break if error or specification found + else + @info "Worker $i (after $t_waited s): Output file found, but no connection details yet" + end + else + @info "Worker $i (after $t_waited s): No output file \"$fn\" yet" + end + + # Sleep for some time to limit ressource usage while waiting for the job to start + sleep(retry_delay) + end + + if !isempty(worker_errors) + throw(SlurmException("Worker $i failed after $t_waited s: $(join(worker_errors, " "))")) + elseif isnothing(slurm_spec_match) + throw(SlurmException("Timeout after $t_waited s while waiting for worker $i to get ready.")) + end + + config = WorkerConfig() + config.port = parse(Int, slurm_spec_match[2]) + config.host = strip(slurm_spec_match[3]) + @info "Worker $i ready after $t_waited s on host $(config.host), port $(config.port)" + # Keep a reference to the proc, so it's properly closed once + # the last worker exits. + config.userdata = srun_proc + push!(instances_arr, config) + notify(c) + end + catch e + println("Error launching Slurm job:") + rethrow(e) + end +end + +function manage(manager::SlurmManager, id::Integer, config::WorkerConfig, + op::Symbol) + # This function needs to exist, but so far we don't do anything +end + +SlurmManager(np::Integer) = SlurmManager(np, ExponentialBackOff(n=10, first_delay=1, + max_delay=512, factor=2)) diff --git a/src/workers.jl b/src/workers.jl index bd45a9e5..5ab19c19 100644 --- a/src/workers.jl +++ b/src/workers.jl @@ -170,7 +170,7 @@ function _addprocs_slurm( slurm_mem_per_cpu = parse(Int, ENV["SLURM_MEM_PER_CPU"]) * 1024^2 slurm_mem_per_task = slurm_nthreads * slurm_mem_per_cpu - cluster_manager = ClusterManagers.SlurmManager(slurm_ntasks, retry_delays) + cluster_manager = LegendDataManagement.SlurmManager(slurm_ntasks, retry_delays) worker_timeout = round(Int, max(sum(cluster_manager.retry_delays), 60)) ENV["JULIA_WORKER_TIMEOUT"] = "$worker_timeout"