Skip to content

Commit

Permalink
4.1.12 - overwallclock fix (#2041)
Browse files Browse the repository at this point in the history
* overwallclock fix

* Fix current tests

* Fix wrapper wallclock

* Fix wrapper wallclock

* changed time.time() x datetime.now()

* Applied Bruno patch

* Moved comment

* feedback

* fix change feedback

* fix change feedback

* fix change feedback

* test

* Checking coverage

* fix older tests to take into account the wallclock

* more test

* another test

* another test

* More tests

* More tests

* More tests

* More tests

* More tests

* More tests

* More tests

* patch from bruno and added an additional assert

* feedback

* Changes from Bruno

* feedback

* feedback

* pep 8

* removed unused job_status

* unused import removed

* Wallclock added to adjusted parameters

* Wallclock added to adjusted parameters
  • Loading branch information
dbeltrankyl authored Jan 23, 2025
1 parent f7c8c83 commit 0f029bd
Show file tree
Hide file tree
Showing 21 changed files with 444 additions and 162 deletions.
82 changes: 37 additions & 45 deletions autosubmit/autosubmit.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
from distutils.util import strtobool
from pathlib import Path
from ruamel.yaml import YAML
from typing import Dict, Set, Tuple, Union
from typing import Dict, Set, Tuple, Union, Any

from autosubmit.database.db_common import update_experiment_descrip_version
from autosubmit.helpers.parameters import PARAMETERS
Expand Down Expand Up @@ -1837,57 +1837,42 @@ def check_wrappers(as_conf, job_list, platforms_to_test, expid):
else:
jobs_to_check[platform.name] = [[job, job_prev_status]]
return jobs_to_check,job_changes_tracker

@staticmethod
def check_wrapper_stored_status(as_conf,job_list):
def check_wrapper_stored_status(as_conf: Any, job_list: Any, wrapper_wallclock: str) -> Any:
"""
Check if the wrapper job has been submitted and the inner jobs are in the queue.
:param as_conf: a BasicConfig object
:param job_list: a JobList object
:return: JobList object updated
Check if the wrapper job has been submitted and the inner jobs are in the queue after a load.
:param as_conf: A BasicConfig object.
:type as_conf: BasicConfig
:param job_list: A JobList object.
:type job_list: JobList
:param wrapper_wallclock: The wallclock of the wrapper.
:type wrapper_wallclock: str
:return: Updated JobList object.
:rtype: JobList
"""
# if packages_dict attr is in job_list
if hasattr(job_list, "packages_dict"):
wrapper_status = Status.SUBMITTED
for package_name, jobs in job_list.packages_dict.items():
from .job.job import WrapperJob
wrapper_status = Status.SUBMITTED
all_completed = True
running = False
queuing = False
failed = False
hold = False
submitted = False
if jobs[0].status == Status.RUNNING or jobs[0].status == Status.COMPLETED:
running = True
for job in jobs:
if job.status == Status.QUEUING:
queuing = True
all_completed = False
elif job.status == Status.FAILED:
failed = True
all_completed = False
elif job.status == Status.HELD:
hold = True
all_completed = False
elif job.status == Status.SUBMITTED:
submitted = True
all_completed = False
if all_completed:
# Ordered by higher priority status
if all(job.status == Status.COMPLETED for job in jobs):
wrapper_status = Status.COMPLETED
elif hold:
elif any(job.status == Status.RUNNING for job in jobs):
wrapper_status = Status.RUNNING
elif any(job.status == Status.FAILED for job in jobs): # No more inner jobs running but inner job in failed
wrapper_status = Status.FAILED
elif any(job.status == Status.QUEUING for job in jobs):
wrapper_status = Status.QUEUING
elif any(job.status == Status.HELD for job in jobs):
wrapper_status = Status.HELD
else:
if running:
wrapper_status = Status.RUNNING
elif queuing:
wrapper_status = Status.QUEUING
elif submitted:
wrapper_status = Status.SUBMITTED
elif failed:
wrapper_status = Status.FAILED
else:
wrapper_status = Status.SUBMITTED
elif any(job.status == Status.SUBMITTED for job in jobs):
wrapper_status = Status.SUBMITTED

wrapper_job = WrapperJob(package_name, jobs[0].id, wrapper_status, 0, jobs,
None,
wrapper_wallclock,
None, jobs[0].platform, as_conf, jobs[0].hold)
job_list.job_package_map[jobs[0].id] = wrapper_job
return job_list
Expand Down Expand Up @@ -2059,12 +2044,14 @@ def prepare_run(expid, notransitive=False, start_time=None, start_after=None,
"job_packages not found", 6016, str(e))
Log.debug("Processing job packages")
try:
for (exp_id, package_name, job_name) in packages:
# fallback value, only affects to is_overclock
wrapper_wallclock = as_conf.experiment_data.get("CONFIG", {}).get("WRAPPERS_WALLCLOCK", "48:00")
for (exp_id, package_name, job_name, wrapper_wallclock) in packages:
if package_name not in job_list.packages_dict:
job_list.packages_dict[package_name] = []
job_list.packages_dict[package_name].append(job_list.get_job_by_name(job_name))
# This function, checks the stored STATUS of jobs inside wrappers. Since "wrapper status" is a memory variable.
job_list = Autosubmit.check_wrapper_stored_status(as_conf, job_list)
job_list = Autosubmit.check_wrapper_stored_status(as_conf, job_list, wrapper_wallclock)
except Exception as e:
raise AutosubmitCritical(
"Autosubmit failed while processing job packages. This might be due to a change in your experiment configuration files after 'autosubmit create' was performed.",
Expand Down Expand Up @@ -5750,10 +5737,15 @@ def load_job_list(expid, as_conf, notransitive=False, monitor=False, new = True)

if str(rerun).lower() == "true":
rerun_jobs = as_conf.get_rerun_jobs()
job_list.rerun(rerun_jobs,as_conf, monitor=monitor)
job_list.rerun(rerun_jobs, as_conf, monitor=monitor)
else:
job_list.remove_rerun_only_jobs(notransitive)

# Inspect -cw and Create -cw commands had issues at this point.
# Reset packed value on load so the jobs can be wrapped again.
for job in job_list.get_waiting() + job_list.get_ready():
job.packed = False

return job_list

@staticmethod
Expand Down
100 changes: 65 additions & 35 deletions autosubmit/job/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ def __init__(self, name, job_id, status, priority):
#: (int) Number of failed attempts to run this job. (FAIL_COUNT)
self._fail_count = 0
self.expid = name.split('_')[0] # type: str
self.parameters = None
self.parameters = dict()
self._tmp_path = os.path.join(
BasicConfig.LOCAL_ROOT_DIR, self.expid, BasicConfig.LOCAL_TMP_DIR)
self._log_path = Path(f"{self._tmp_path}/LOG_{self.expid}")
Expand Down Expand Up @@ -260,6 +260,7 @@ def __init__(self, name, job_id, status, priority):
self.ready_date = None
self.wrapper_name = None
self.is_wrapper = False
self._wallclock_in_seconds = None

def _adjust_new_parameters(self) -> None:
"""
Expand All @@ -277,6 +278,10 @@ def _adjust_new_parameters(self) -> None:
if not hasattr(self, "_log_path"): # Added in 4.1.12
self._log_path = Path(f"{self._tmp_path}/LOG_{self.expid}")

if not hasattr(self, "_wallclock_in_seconds"): # Added in 4.1.12
self._wallclock_in_seconds = None
self.wallclock = self.wallclock # also sets the wallclock in seconds

def _init_runtime_parameters(self):
# hetjobs
self.het = {'HETSIZE': 0}
Expand All @@ -295,11 +300,16 @@ def _init_runtime_parameters(self):
self.stat_file = self.script_name[:-4] + "_STAT_0"


@property
def wallclock_in_seconds(self):
return self._wallclock_in_seconds

@property
@autosubmit_parameter(name='x11')
def x11(self):
"""Whether to use X11 forwarding"""
return self._x11

@x11.setter
def x11(self, value):
self._x11 = value
Expand Down Expand Up @@ -436,6 +446,16 @@ def wallclock(self):
def wallclock(self, value):
self._wallclock = value

if not self._wallclock_in_seconds or self.status not in [Status.RUNNING, Status.QUEUING, Status.SUBMITTED]:
# Should always take the max_wallclock set in the platform, this is set as fallback
# (and local platform doesn't have a max_wallclock defined)
if not self._wallclock or self._wallclock == "00:00":
self._wallclock = self.parameters.get("CONFIG.JOB_WALLCLOCK", "24:00")
Log.warning(f"No wallclock is set for this job. Default to {self._wallclock}. "
"You can change this value in CONFIG.WALLCLOCK")
wallclock_parsed = self.parse_time(self._wallclock)
self._wallclock_in_seconds = self._time_in_seconds_and_margin(wallclock_parsed)

@property
@autosubmit_parameter(name='hyperthreading')
def hyperthreading(self):
Expand Down Expand Up @@ -718,7 +738,7 @@ def platform(self):
"""
Returns the platform to be used by the job. Chooses between serial and parallel platforms
:return HPCPlatform object for the job to use
:return: HPCPlatform object for the job to use
:rtype: HPCPlatform
"""
if self.is_serial and self._platform:
Expand Down Expand Up @@ -1273,49 +1293,59 @@ def retrieve_logfiles(self, platform: Any, raise_error: bool = False) -> Dict[st
Log.result(f"{platform.name}(log_recovery) Successfully recovered log for job '{self.name}' and retry '{self.fail_count}'.")


def parse_time(self,wallclock):
def _max_possible_wallclock(self):
if self.platform and self.platform.max_wallclock:
wallclock = self.parse_time(self.platform.max_wallclock)
if wallclock:
return int(wallclock.total_seconds())
return None

def _time_in_seconds_and_margin(self, wallclock: datetime.timedelta) -> int:
"""
Calculate the total wallclock time in seconds and the wallclock time with a margin.
This method increases the given wallclock time by 30%.
It then converts the total wallclock time to seconds and returns both the total
wallclock time in seconds and the wallclock time with the margin as a timedelta.
:param wallclock: The original wallclock time.
:type wallclock: datetime.timedelta
:return int: The total wallclock time in seconds.
"""
total = int(wallclock.total_seconds() * 1.30)
total_platform = self._max_possible_wallclock()
if not total_platform:
total_platform = total
if total > total_platform:
Log.warning(f"Job {self.name} has a wallclock time '{total} seconds' higher than the maximum allowed by the platform '{total_platform} seconds' "
f"Setting wallclock time to the maximum allowed by the platform.")
total = total_platform
wallclock_delta = datetime.timedelta(seconds=total)
return int(wallclock_delta.total_seconds())

@staticmethod
def parse_time(wallclock):
regex = re.compile(r'(((?P<hours>\d+):)((?P<minutes>\d+)))(:(?P<seconds>\d+))?')
parts = regex.match(wallclock)
if not parts:
return
return None
parts = parts.groupdict()
if int(parts['hours']) > 0 :
format_ = "hour"
else:
format_ = "minute"
time_params = {}
for name, param in parts.items():
if param:
time_params[name] = int(param)
return datetime.timedelta(**time_params),format_
return datetime.timedelta(**time_params)

# Duplicated for wrappers and jobs to fix in 4.0.0
def is_over_wallclock(self, start_time, wallclock):
# TODO Duplicated for wrappers and jobs to fix in 4.1.X but in wrappers is called _is_over_wallclock for unknown reasons
def is_over_wallclock(self):
"""
Check if the job is over the wallclock time, it is an alternative method to avoid platform issues
:param start_time:
:param wallclock:
:return:
"""
elapsed = datetime.datetime.now() - start_time
wallclock,time_format = self.parse_time(wallclock)
if time_format == "hour":
total = wallclock.days * 24 + wallclock.seconds / 60 / 60
else:
total = wallclock.days * 24 + wallclock.seconds / 60
total = total * 1.30 # in this case we only want to avoid slurm issues so the time is increased by 50%
if time_format == "hour":
hour = int(total)
minute = int((total - int(total)) * 60.0)
second = int(((total - int(total)) * 60 -
int((total - int(total)) * 60.0)) * 60.0)
wallclock_delta = datetime.timedelta(hours=hour, minutes=minute,
seconds=second)
else:
minute = int(total)
second = int((total - int(total)) * 60.0)
wallclock_delta = datetime.timedelta(minutes=minute, seconds=second)
if elapsed > wallclock_delta:
elapsed = datetime.datetime.now() - self.start_time
if int(elapsed.total_seconds()) > self.wallclock_in_seconds:
Log.warning(f"Job {self.name} is over wallclock time, Autosubmit will check if it is completed")
return True
return False

Expand Down Expand Up @@ -2052,8 +2082,8 @@ def update_parameters(self, as_conf, parameters,
as_conf.reload()
self._adjust_new_parameters()
self._init_runtime_parameters()
if hasattr(self, "start_time"):
self.start_time = time.time()
if not hasattr(self, "start_time"):
self.start_time = datetime.datetime.now()
# Parameters that affect to all the rest of parameters
self.update_dict_parameters(as_conf)
parameters = parameters.copy()
Expand Down Expand Up @@ -2610,7 +2640,7 @@ def __init__(self, name, job_id, status, priority, job_list, total_wallclock, nu
self.failed = False
self.job_list = job_list
# divide jobs in dictionary by state?
self.wallclock = total_wallclock
self.wallclock = total_wallclock # Now it is reloaded after a run -> stop -> run
self.num_processors = num_processors
self.running_jobs_start = OrderedDict()
self._platform = platform
Expand Down
3 changes: 1 addition & 2 deletions autosubmit/job/job_list.py
Original file line number Diff line number Diff line change
Expand Up @@ -3034,8 +3034,7 @@ def save_wrappers(self, packages_to_save, failed_packages, as_conf, packages_per
self.job_package_map[package.jobs[0].id] = wrapper_job
if isinstance(package, JobPackageThread):
# Saving only when it is a real multi job package
packages_persistence.save(
package.name, package.jobs, package._expid, inspect)
packages_persistence.save(package, inspect) # Need to store the wallclock for the is_overwallclock function

def check_scripts(self, as_conf):
"""
Expand Down
Loading

0 comments on commit 0f029bd

Please sign in to comment.