Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

overwallclock fix #2041

Merged
merged 36 commits into from
Jan 23, 2025
Merged
Show file tree
Hide file tree
Changes from 24 commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
f4caec7
overwallclock fix
dbeltrankyl Dec 18, 2024
feacc91
Fix current tests
dbeltrankyl Dec 18, 2024
0e19b7f
Fix wrapper wallclock
dbeltrankyl Dec 18, 2024
b178a68
Fix wrapper wallclock
dbeltrankyl Dec 18, 2024
1cca091
changed time.time() x datetime.now()
dbeltrankyl Dec 18, 2024
f3ad3a4
Applied Bruno patch
dbeltrankyl Jan 15, 2025
5ca5cf0
Moved comment
dbeltrankyl Jan 15, 2025
f9deab8
feedback
dbeltrankyl Jan 15, 2025
fb820c6
fix change feedback
dbeltrankyl Jan 15, 2025
9689dec
fix change feedback
dbeltrankyl Jan 15, 2025
c66f785
fix change feedback
dbeltrankyl Jan 15, 2025
4533222
test
dbeltrankyl Jan 15, 2025
1523cf3
Checking coverage
dbeltrankyl Jan 16, 2025
5c04a88
fix older tests to take into account the wallclock
dbeltrankyl Jan 16, 2025
618e67f
more test
dbeltrankyl Jan 16, 2025
b848dce
another test
dbeltrankyl Jan 16, 2025
77325ec
another test
dbeltrankyl Jan 16, 2025
d9b0ebc
More tests
dbeltrankyl Jan 17, 2025
f837f6d
More tests
dbeltrankyl Jan 17, 2025
5079617
More tests
dbeltrankyl Jan 17, 2025
9c19726
More tests
dbeltrankyl Jan 17, 2025
c703745
More tests
dbeltrankyl Jan 17, 2025
b34961e
More tests
dbeltrankyl Jan 17, 2025
72a218f
More tests
dbeltrankyl Jan 17, 2025
169f5fb
Merge branch 'master' into 4.1.12-overwallclock-fix
dbeltrankyl Jan 21, 2025
396d94e
patch from bruno and added an additional assert
dbeltrankyl Jan 22, 2025
d3b9dcc
feedback
dbeltrankyl Jan 22, 2025
9f06df6
Changes from Bruno
dbeltrankyl Jan 22, 2025
914a7d6
feedback
dbeltrankyl Jan 22, 2025
cb93488
feedback
dbeltrankyl Jan 23, 2025
7716546
Merge remote-tracking branch 'origin/4.1.12-overwallclock-fix' into 4…
dbeltrankyl Jan 23, 2025
1d0271d
pep 8
dbeltrankyl Jan 23, 2025
804e6a2
removed unused job_status
dbeltrankyl Jan 23, 2025
8c95d42
unused import removed
dbeltrankyl Jan 23, 2025
285ce61
Wallclock added to adjusted parameters
dbeltrankyl Jan 23, 2025
075fc7f
Wallclock added to adjusted parameters
dbeltrankyl Jan 23, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 37 additions & 45 deletions autosubmit/autosubmit.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
from distutils.util import strtobool
from pathlib import Path
from ruamel.yaml import YAML
from typing import Dict, Set, Tuple, Union
from typing import Dict, Set, Tuple, Union, Any

from autosubmit.database.db_common import update_experiment_descrip_version
from autosubmit.helpers.parameters import PARAMETERS
Expand Down Expand Up @@ -1837,57 +1837,42 @@ def check_wrappers(as_conf, job_list, platforms_to_test, expid):
else:
jobs_to_check[platform.name] = [[job, job_prev_status]]
return jobs_to_check,job_changes_tracker

@staticmethod
def check_wrapper_stored_status(as_conf,job_list):
def check_wrapper_stored_status(as_conf: Any, job_list: Any, wrapper_wallclock: str) -> Any:
"""
Check if the wrapper job has been submitted and the inner jobs are in the queue.
:param as_conf: a BasicConfig object
:param job_list: a JobList object
:return: JobList object updated
Check if the wrapper job has been submitted and the inner jobs are in the queue after a load.

:param as_conf: A BasicConfig object.
:type as_conf: BasicConfig
:param job_list: A JobList object.
:type job_list: JobList
:param wrapper_wallclock: The wallclock of the wrapper.
:type wrapper_wallclock: str
:return: Updated JobList object.
:rtype: JobList
"""
# if packages_dict attr is in job_list
if hasattr(job_list, "packages_dict"):
wrapper_status = Status.SUBMITTED
for package_name, jobs in job_list.packages_dict.items():
from .job.job import WrapperJob
wrapper_status = Status.SUBMITTED
all_completed = True
running = False
queuing = False
failed = False
hold = False
submitted = False
if jobs[0].status == Status.RUNNING or jobs[0].status == Status.COMPLETED:
running = True
for job in jobs:
if job.status == Status.QUEUING:
queuing = True
all_completed = False
elif job.status == Status.FAILED:
failed = True
all_completed = False
elif job.status == Status.HELD:
hold = True
all_completed = False
elif job.status == Status.SUBMITTED:
submitted = True
all_completed = False
if all_completed:
# Ordered by higher priority status
if all(job.status == Status.COMPLETED for job in jobs):
wrapper_status = Status.COMPLETED
elif hold:
elif any(job.status == Status.RUNNING for job in jobs):
wrapper_status = Status.RUNNING
elif any(job.status == Status.FAILED for job in jobs): # No more inner jobs running but inner job in failed
wrapper_status = Status.FAILED
elif any(job.status == Status.QUEUING for job in jobs):
wrapper_status = Status.QUEUING
elif any(job.status == Status.HELD for job in jobs):
wrapper_status = Status.HELD
else:
if running:
wrapper_status = Status.RUNNING
elif queuing:
wrapper_status = Status.QUEUING
elif submitted:
wrapper_status = Status.SUBMITTED
elif failed:
wrapper_status = Status.FAILED
else:
wrapper_status = Status.SUBMITTED
elif any(job.status == Status.SUBMITTED for job in jobs):
wrapper_status = Status.SUBMITTED

wrapper_job = WrapperJob(package_name, jobs[0].id, wrapper_status, 0, jobs,
None,
wrapper_wallclock,
None, jobs[0].platform, as_conf, jobs[0].hold)
job_list.job_package_map[jobs[0].id] = wrapper_job
return job_list
Expand Down Expand Up @@ -2059,12 +2044,14 @@ def prepare_run(expid, notransitive=False, start_time=None, start_after=None,
"job_packages not found", 6016, str(e))
Log.debug("Processing job packages")
try:
for (exp_id, package_name, job_name) in packages:
# fallback value, only affects to is_overclock
wrapper_wallclock = as_conf.experiment_data.get("CONFIG", {}).get("WRAPPERS_WALLCLOCK", "48:00")
for (exp_id, package_name, job_name, wrapper_wallclock) in packages:
if package_name not in job_list.packages_dict:
job_list.packages_dict[package_name] = []
job_list.packages_dict[package_name].append(job_list.get_job_by_name(job_name))
# This function, checks the stored STATUS of jobs inside wrappers. Since "wrapper status" is a memory variable.
job_list = Autosubmit.check_wrapper_stored_status(as_conf, job_list)
job_list = Autosubmit.check_wrapper_stored_status(as_conf, job_list, wrapper_wallclock)
except Exception as e:
raise AutosubmitCritical(
"Autosubmit failed while processing job packages. This might be due to a change in your experiment configuration files after 'autosubmit create' was performed.",
Expand Down Expand Up @@ -5750,10 +5737,15 @@ def load_job_list(expid, as_conf, notransitive=False, monitor=False, new = True)

if str(rerun).lower() == "true":
rerun_jobs = as_conf.get_rerun_jobs()
job_list.rerun(rerun_jobs,as_conf, monitor=monitor)
job_list.rerun(rerun_jobs, as_conf, monitor=monitor)
else:
job_list.remove_rerun_only_jobs(notransitive)

# Inspect -cw and Create -cw commands had issues at this point.
# Reset packed value on load so the jobs can be wrapped again.
for job in job_list.get_waiting() + job_list.get_ready():
job.packed = False

return job_list

@staticmethod
Expand Down
105 changes: 70 additions & 35 deletions autosubmit/job/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
from functools import reduce
from threading import Thread
from time import sleep
from typing import List, Union, Dict, Any
from typing import List, Union, Dict, Any, Tuple
dbeltrankyl marked this conversation as resolved.
Show resolved Hide resolved

from autosubmit.helpers.parameters import autosubmit_parameter, autosubmit_parameters
from autosubmit.history.experiment_history import ExperimentHistory
Expand Down Expand Up @@ -219,7 +219,7 @@ def __init__(self, name, job_id, status, priority):
#: (int) Number of failed attempts to run this job. (FAIL_COUNT)
self._fail_count = 0
self.expid = name.split('_')[0] # type: str
self.parameters = None
self.parameters = dict()
self._tmp_path = os.path.join(
BasicConfig.LOCAL_ROOT_DIR, self.expid, BasicConfig.LOCAL_TMP_DIR)
self._log_path = Path(f"{self._tmp_path}/LOG_{self.expid}")
Expand Down Expand Up @@ -260,6 +260,7 @@ def __init__(self, name, job_id, status, priority):
self.ready_date = None
self.wrapper_name = None
self.is_wrapper = False
self._wallclock_in_seconds = None

def _adjust_new_parameters(self) -> None:
"""
Expand All @@ -273,6 +274,9 @@ def _adjust_new_parameters(self) -> None:
else:
self.is_wrapper = True
self.wrapper_name = "wrapped"
if not hasattr(self, '_wallclock_in_seconds'): # Added in 4.1.12
self._wallclock_in_seconds = None
self.wallclock_in_seconds = self.wallclock
kinow marked this conversation as resolved.
Show resolved Hide resolved

def _init_runtime_parameters(self):
# hetjobs
Expand All @@ -292,11 +296,26 @@ def _init_runtime_parameters(self):
self.stat_file = self.script_name[:-4] + "_STAT_0"


@property
def wallclock_in_seconds(self):
return self._wallclock_in_seconds

@wallclock_in_seconds.setter
def wallclock_in_seconds(self, wallclock):
if not self._wallclock_in_seconds or self.status not in [Status.RUNNING, Status.QUEUING, Status.SUBMITTED]:
# Should always take the max_wallclock set in the platform, this is set as fallback ( and local platform doesn't have a max_wallclock defined )
if not wallclock or wallclock == "00:00":
wallclock = self.parameters.get("CONFIG.JOB_WALLCLOCK", "24:00")
Log.warning(f"No wallclock is set for this job. Default to {wallclock}. You can change this value in CONFIG.WALLCLOCK")
wallclock = self.parse_time(wallclock)
self._wallclock_in_seconds = self._time_in_seconds_and_margin(wallclock)
kinow marked this conversation as resolved.
Show resolved Hide resolved

@property
@autosubmit_parameter(name='x11')
def x11(self):
"""Whether to use X11 forwarding"""
return self._x11

@x11.setter
def x11(self, value):
self._x11 = value
Expand Down Expand Up @@ -1270,49 +1289,61 @@ def retrieve_logfiles(self, platform: Any, raise_error: bool = False) -> Dict[st
Log.result(f"{platform.name}(log_recovery) Successfully recovered log for job '{self.name}' and retry '{self.fail_count}'.")


def parse_time(self,wallclock):
def _max_possible_wallclock(self):
if self.platform and self.platform.max_wallclock:
wallclock = self.parse_time(self.platform.max_wallclock)
kinow marked this conversation as resolved.
Show resolved Hide resolved
if wallclock:
return int(wallclock.total_seconds())
return None

def _time_in_seconds_and_margin(self, wallclock: datetime.timedelta) -> int:
"""
Calculate the total wallclock time in seconds and the wallclock time with a margin.

This method increases the given wallclock time by 30%.
It then converts the total wallclock time to seconds and returns both the total
wallclock time in seconds and the wallclock time with the margin as a timedelta.
kinow marked this conversation as resolved.
Show resolved Hide resolved

:param wallclock: The original wallclock time.
:type wallclock: datetime.timedelta

:return int: The total wallclock time in seconds.
"""
total = int(wallclock.total_seconds() * 1.30)
total_platform = self._max_possible_wallclock()
if not total_platform:
total_platform = total
if total > total_platform:
Log.warning(f"Job {self.name} has a wallclock time '{total} seconds' higher than the maximum allowed by the platform '{total_platform} seconds' "
f"Setting wallclock time to the maximum allowed by the platform.")
total = total_platform
wallclock_delta = datetime.timedelta(seconds=total)
return int(wallclock_delta.total_seconds())

@staticmethod
def parse_time(wallclock):
regex = re.compile(r'(((?P<hours>\d+):)((?P<minutes>\d+)))(:(?P<seconds>\d+))?')
parts = regex.match(wallclock)
if not parts:
return
return None
parts = parts.groupdict()
if int(parts['hours']) > 0 :
format_ = "hour"
else:
format_ = "minute"
time_params = {}
for name, param in parts.items():
if param:
time_params[name] = int(param)
return datetime.timedelta(**time_params),format_
return datetime.timedelta(**time_params)

# Duplicated for wrappers and jobs to fix in 4.0.0
def is_over_wallclock(self, start_time, wallclock):
# TODO Duplicated for wrappers and jobs to fix in 4.1.X but in wrappers is called _is_over_wallclock for unknown reasons
def is_over_wallclock(self):
"""
Check if the job is over the wallclock time, it is an alternative method to avoid platform issues
:param start_time:
:param wallclock:
:return:
"""
elapsed = datetime.datetime.now() - start_time
wallclock,time_format = self.parse_time(wallclock)
if time_format == "hour":
total = wallclock.days * 24 + wallclock.seconds / 60 / 60
else:
total = wallclock.days * 24 + wallclock.seconds / 60
total = total * 1.30 # in this case we only want to avoid slurm issues so the time is increased by 50%
if time_format == "hour":
hour = int(total)
minute = int((total - int(total)) * 60.0)
second = int(((total - int(total)) * 60 -
int((total - int(total)) * 60.0)) * 60.0)
wallclock_delta = datetime.timedelta(hours=hour, minutes=minute,
seconds=second)
else:
minute = int(total)
second = int((total - int(total)) * 60.0)
wallclock_delta = datetime.timedelta(minutes=minute, seconds=second)
if elapsed > wallclock_delta:
elapsed = datetime.datetime.now() - self.start_time
if not self.wallclock_in_seconds:
self.wallclock_in_seconds = self.wallclock
if int(elapsed.total_seconds()) > self.wallclock_in_seconds:
kinow marked this conversation as resolved.
Show resolved Hide resolved
Log.warning(f"Job {self.name} is over wallclock time, Autosubmit will check if it is completed")
return True
return False

Expand Down Expand Up @@ -1713,6 +1744,8 @@ def update_platform_associated_parameters(self, as_conf, parameters, job_platfor
self.memory = parameters.get("CURRENT_MEMORY", "")
self.memory_per_task = parameters.get("CURRENT_MEMORY_PER_TASK", parameters.get("CURRENT_MEMORY_PER_TASK", ""))
self.wallclock = parameters.get("CURRENT_WALLCLOCK", parameters.get("CURRENT_MAX_WALLCLOCK", None))
if self.status in [Status.READY, Status.PREPARED]:
self.wallclock_in_seconds = self.wallclock
self.custom_directives = parameters.get("CURRENT_CUSTOM_DIRECTIVES", "")
self.process_scheduler_parameters(job_platform, chunk)
if self.het.get('HETSIZE', 1) > 1:
Expand Down Expand Up @@ -2049,8 +2082,8 @@ def update_parameters(self, as_conf, parameters,
as_conf.reload()
self._adjust_new_parameters()
self._init_runtime_parameters()
if hasattr(self, "start_time"):
self.start_time = time.time()
if not hasattr(self, "start_time"):
self.start_time = datetime.datetime.now()
# Parameters that affect to all the rest of parameters
self.update_dict_parameters(as_conf)
parameters = parameters.copy()
Expand Down Expand Up @@ -2607,7 +2640,7 @@ def __init__(self, name, job_id, status, priority, job_list, total_wallclock, nu
self.failed = False
self.job_list = job_list
# divide jobs in dictionary by state?
self.wallclock = total_wallclock
self.wallclock = total_wallclock # Now it is reloaded after a run -> stop -> run
self.num_processors = num_processors
self.running_jobs_start = OrderedDict()
self._platform = platform
Expand All @@ -2617,6 +2650,8 @@ def __init__(self, name, job_id, status, priority, job_list, total_wallclock, nu
self.hold = hold
self.inner_jobs_running = list()
self.is_wrapper = True
if not self.wallclock_in_seconds:
self.wallclock_in_seconds = self._wallclock


def _queuing_reason_cancel(self, reason):
Expand Down
3 changes: 1 addition & 2 deletions autosubmit/job/job_list.py
Original file line number Diff line number Diff line change
Expand Up @@ -3034,8 +3034,7 @@ def save_wrappers(self, packages_to_save, failed_packages, as_conf, packages_per
self.job_package_map[package.jobs[0].id] = wrapper_job
if isinstance(package, JobPackageThread):
# Saving only when it is a real multi job package
packages_persistence.save(
package.name, package.jobs, package._expid, inspect)
packages_persistence.save(package, inspect) # Need to store the wallclock for the is_overwallclock function

def check_scripts(self, as_conf):
"""
Expand Down
Loading
Loading