Skip to content

Commit

Permalink
Merge pull request #264 from cta-observatory/nice_parameter
Browse files Browse the repository at this point in the history
Setting a nice parameter (the job priority).
  • Loading branch information
jsitarek authored Oct 7, 2024
2 parents 450617a + cda74c7 commit 4b3e5b3
Show file tree
Hide file tree
Showing 7 changed files with 50 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@
__all__ = ["slurm_lines", "rc_lines"]


def slurm_lines(queue, job_name, array=None, mem=None, out_name=None):
def slurm_lines(
queue, job_name, nice_parameter=None, array=None, mem=None, out_name=None
):

"""
Function for creating the general lines that slurm scripts are starting with.
Expand All @@ -14,6 +17,8 @@ def slurm_lines(queue, job_name, array=None, mem=None, out_name=None):
Name of the queue
job_name : str
Job name
nice_parameter : int or None
Job priority
array : None or int
If not none array of jobs from 0 to array will be made
mem : None or str
Expand All @@ -35,6 +40,7 @@ def slurm_lines(queue, job_name, array=None, mem=None, out_name=None):
"#SBATCH -n 1\n\n",
f"#SBATCH --output={out_name}.out\n" if out_name is not None else "",
f"#SBATCH --error={out_name}.err\n\n" if out_name is not None else "",
f"#SBATCH --nice={nice_parameter}\n" if nice_parameter is not None else "",
"ulimit -l unlimited\n",
"ulimit -s unlimited\n",
"ulimit -a\n\n",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,9 @@ def configfile_coincidence(target_dir, source_name, config_file):
yaml.dump(conf, f, default_flow_style=False)


def linking_bash_lst(target_dir, LST_runs, source_name, LST_version, env_name, cluster):

def linking_bash_lst(
target_dir, LST_runs, source_name, LST_version, env_name, cluster, nice
):
"""
This function links the LST data paths to the working directory and creates bash scripts.
Expand All @@ -92,6 +93,8 @@ def linking_bash_lst(target_dir, LST_runs, source_name, LST_version, env_name, c
Name of the conda environment
cluster : str
Cluster system
nice : int or None
Job priority
"""

coincidence_DL1_dir = f"{target_dir}/v{__version__}/{source_name}"
Expand Down Expand Up @@ -140,6 +143,7 @@ def linking_bash_lst(target_dir, LST_runs, source_name, LST_version, env_name, c
slurm = slurm_lines(
queue="short",
job_name=f"{source_name}_coincidence",
nice_parameter=nice,
array=process_size,
mem="6g",
out_name=f"{outputdir}/logs/slurm-%x.%A_%a",
Expand Down Expand Up @@ -194,6 +198,7 @@ def main():
env_name = config["general"]["env_name"]
LST_version = config["general"]["LST_version"]
config_file = config["general"]["base_config_file"]
nice_parameter = config["general"]["nice"] if "nice" in config["general"] else None

source_in = config["data_selection"]["source_name_database"]
source = config["data_selection"]["source_name_output"]
Expand Down Expand Up @@ -227,6 +232,7 @@ def main():
LST_version,
env_name,
cluster,
nice_parameter,
) # linking the data paths to current working directory

print("***** Submitting processess to the cluster...")
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,11 @@
mc_tel_ids:
LST-1: 1
LST-2: 0
LST-3: 0
LST-4: 0
MAGIC-I: 2
MAGIC-II: 3

directories:
workspace_dir: "/fefs/aswg/workspace/elisa.visentin/auto_MCP_PR/" # Output directory where all the data products will be saved.

Expand All @@ -21,4 +29,6 @@ general:
nsb: [0.5, 1.0, 1.5, 2.0, 2.5, 3.0]
env_name: magic-lst # name of the conda environment to be used to process data.
cluster: "SLURM" # cluster management system on which data are processed. At the moment we have only SLURM available, in the future maybe also condor (PIC, CNAF).
nice: # Set the job priority (only positive integer value). A lower nice value increases the priority, while a higher value reduces it.


Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ def config_file_gen(target_dir, source_name, config_file):


def lists_and_bash_gen_MAGIC(
target_dir, telescope_ids, MAGIC_runs, source, env_name, cluster
target_dir, telescope_ids, MAGIC_runs, source, env_name, cluster, nice
):

"""
Expand All @@ -95,6 +95,8 @@ def lists_and_bash_gen_MAGIC(
Name of the environment
cluster : str
Cluster system
nice : int or None
Job priority
"""
if cluster != "SLURM":
logger.warning(
Expand All @@ -105,6 +107,7 @@ def lists_and_bash_gen_MAGIC(
lines = slurm_lines(
queue="short",
job_name=process_name,
nice_parameter=nice,
out_name=f"{target_dir}/v{__version__}/{source}/DL1/slurm-linkMAGIC-%x.%j",
)

Expand Down Expand Up @@ -134,6 +137,7 @@ def lists_and_bash_gen_MAGIC(
slurm = slurm_lines(
queue="short",
job_name=process_name,
nice_parameter=nice,
array=number_of_nodes,
mem="2g",
out_name=f"{target_dir}/v{__version__}/{source}/DL1/M{magic}/{i[0]}/{i[1]}/logs/slurm-%x.%A_%a",
Expand Down Expand Up @@ -220,6 +224,7 @@ def main():
source = config["data_selection"]["source_name_output"]
cluster = config["general"]["cluster"]
target_dir = Path(config["directories"]["workspace_dir"])
nice_parameter = config["general"]["nice"] if "nice" in config["general"] else None

if source_in is None:
source_list = joblib.load("list_sources.dat")
Expand Down Expand Up @@ -255,6 +260,7 @@ def main():
source_name,
env_name,
cluster,
nice_parameter,
) # MAGIC real data
if (telescope_ids[-2] > 0) or (telescope_ids[-1] > 0):
list_of_MAGIC_runs = glob.glob(f"{source_name}_MAGIC-*.sh")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
logger.setLevel(logging.INFO)


def MergeStereo(target_dir, env_name, source, cluster):
def MergeStereo(target_dir, env_name, source, cluster, nice):
"""
This function creates the bash scripts to run merge_hdf_files.py in all DL1Stereo subruns.
Expand All @@ -41,6 +41,8 @@ def MergeStereo(target_dir, env_name, source, cluster):
Name of the target
cluster : str
Cluster system
nice : int or None
Job priority
"""

process_name = source
Expand All @@ -62,6 +64,7 @@ def MergeStereo(target_dir, env_name, source, cluster):
slurm = slurm_lines(
queue="short",
job_name=f"{process_name}_stereo_merge",
nice_parameter=nice,
mem="2g",
out_name=f"{stereoMergeDir}/logs/slurm-%x.%A_%a",
)
Expand Down Expand Up @@ -109,6 +112,7 @@ def main():
source_in = config["data_selection"]["source_name_database"]
source = config["data_selection"]["source_name_output"]
cluster = config["general"]["cluster"]
nice_parameter = config["general"]["nice"] if "nice" in config["general"] else None

if source_in is None:
source_list = joblib.load("list_sources.dat")
Expand All @@ -121,7 +125,7 @@ def main():
for source_name in source_list:

print("***** Merging DL1Stereo files run-wise...")
MergeStereo(target_dir, env_name, source_name, cluster)
MergeStereo(target_dir, env_name, source_name, cluster, nice_parameter)

list_of_merge = glob.glob(f"{source_name}_StereoMerge_*.sh")
if len(list_of_merge) < 1:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
logger.setLevel(logging.INFO)


def merge(target_dir, MAGIC_runs, env_name, source, cluster):
def merge(target_dir, MAGIC_runs, env_name, source, cluster, nice):

"""
This function creates the bash scripts to run merge_hdf_files.py for real data
Expand All @@ -50,6 +50,8 @@ def merge(target_dir, MAGIC_runs, env_name, source, cluster):
Target name
cluster : str
Cluster system
nice : int or None
Job priority
"""

process_name = f"merging_{source}"
Expand All @@ -64,6 +66,7 @@ def merge(target_dir, MAGIC_runs, env_name, source, cluster):
lines = slurm_lines(
queue="short",
job_name=process_name,
nice_parameter=nice,
mem="2g",
out_name=f"{MAGIC_DL1_dir}/Merged/logs/slurm-%x.%j",
)
Expand Down Expand Up @@ -120,6 +123,7 @@ def main():
source_in = config["data_selection"]["source_name_database"]
source = config["data_selection"]["source_name_output"]
cluster = config["general"]["cluster"]
nice_parameter = config["general"]["nice"] if "nice" in config["general"] else None

if source_in is None:
source_list = joblib.load("list_sources.dat")
Expand All @@ -144,6 +148,7 @@ def main():
env_name,
source_name,
cluster,
nice_parameter,
) # generating the bash script to merge the subruns

print("***** Running merge_hdf_files.py on the MAGIC data files...")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ def configfile_stereo(target_dir, source_name, config_file):
yaml.dump(conf, f, default_flow_style=False)


def bash_stereo(target_dir, source, env_name, cluster):
def bash_stereo(target_dir, source, env_name, cluster, nice):

"""
This function generates the bashscripts for running the stereo analysis.
Expand All @@ -78,6 +78,8 @@ def bash_stereo(target_dir, source, env_name, cluster):
Name of the environment
cluster : str
Cluster system
nice : int or None
Job priority
"""

process_name = source
Expand Down Expand Up @@ -111,6 +113,7 @@ def bash_stereo(target_dir, source, env_name, cluster):
slurm = slurm_lines(
queue="short",
job_name=f"{process_name}_stereo",
nice_parameter=nice,
array=process_size,
mem="2g",
out_name=f"{stereoDir}/logs/slurm-%x.%A_%a",
Expand Down Expand Up @@ -166,6 +169,7 @@ def main():
source = config["data_selection"]["source_name_output"]

cluster = config["general"]["cluster"]
nice_parameter = config["general"]["nice"] if "nice" in config["general"] else None

if source_in is None:
source_list = joblib.load("list_sources.dat")
Expand All @@ -182,7 +186,7 @@ def main():
# Below we run the analysis on the real data

print("***** Generating the bashscript...")
bash_stereo(target_dir, source_name, env_name, cluster)
bash_stereo(target_dir, source_name, env_name, cluster, nice_parameter)

print("***** Submitting processess to the cluster...")
print(f"Process name: {source_name}_stereo")
Expand Down

0 comments on commit 4b3e5b3

Please sign in to comment.