diff --git a/autosubmit/autosubmit.py b/autosubmit/autosubmit.py index b21e32f0d..2c1c2ced1 100644 --- a/autosubmit/autosubmit.py +++ b/autosubmit/autosubmit.py @@ -26,7 +26,7 @@ from distutils.util import strtobool from pathlib import Path from ruamel.yaml import YAML -from typing import Dict, Set, Tuple, Union +from typing import Dict, Set, Tuple, Union, Any from autosubmit.database.db_common import update_experiment_descrip_version from autosubmit.helpers.parameters import PARAMETERS @@ -1837,57 +1837,42 @@ def check_wrappers(as_conf, job_list, platforms_to_test, expid): else: jobs_to_check[platform.name] = [[job, job_prev_status]] return jobs_to_check,job_changes_tracker + @staticmethod - def check_wrapper_stored_status(as_conf,job_list): + def check_wrapper_stored_status(as_conf: Any, job_list: Any, wrapper_wallclock: str) -> Any: """ - Check if the wrapper job has been submitted and the inner jobs are in the queue. - :param as_conf: a BasicConfig object - :param job_list: a JobList object - :return: JobList object updated + Check if the wrapper job has been submitted and the inner jobs are in the queue after a load. + + :param as_conf: A BasicConfig object. + :type as_conf: BasicConfig + :param job_list: A JobList object. + :type job_list: JobList + :param wrapper_wallclock: The wallclock of the wrapper. + :type wrapper_wallclock: str + :return: Updated JobList object. + :rtype: JobList """ # if packages_dict attr is in job_list if hasattr(job_list, "packages_dict"): + wrapper_status = Status.SUBMITTED for package_name, jobs in job_list.packages_dict.items(): from .job.job import WrapperJob - wrapper_status = Status.SUBMITTED - all_completed = True - running = False - queuing = False - failed = False - hold = False - submitted = False - if jobs[0].status == Status.RUNNING or jobs[0].status == Status.COMPLETED: - running = True - for job in jobs: - if job.status == Status.QUEUING: - queuing = True - all_completed = False - elif job.status == Status.FAILED: - failed = True - all_completed = False - elif job.status == Status.HELD: - hold = True - all_completed = False - elif job.status == Status.SUBMITTED: - submitted = True - all_completed = False - if all_completed: + # Ordered by higher priority status + if all(job.status == Status.COMPLETED for job in jobs): wrapper_status = Status.COMPLETED - elif hold: + elif any(job.status == Status.RUNNING for job in jobs): + wrapper_status = Status.RUNNING + elif any(job.status == Status.FAILED for job in jobs): # No more inner jobs running but inner job in failed + wrapper_status = Status.FAILED + elif any(job.status == Status.QUEUING for job in jobs): + wrapper_status = Status.QUEUING + elif any(job.status == Status.HELD for job in jobs): wrapper_status = Status.HELD - else: - if running: - wrapper_status = Status.RUNNING - elif queuing: - wrapper_status = Status.QUEUING - elif submitted: - wrapper_status = Status.SUBMITTED - elif failed: - wrapper_status = Status.FAILED - else: - wrapper_status = Status.SUBMITTED + elif any(job.status == Status.SUBMITTED for job in jobs): + wrapper_status = Status.SUBMITTED + wrapper_job = WrapperJob(package_name, jobs[0].id, wrapper_status, 0, jobs, - None, + wrapper_wallclock, None, jobs[0].platform, as_conf, jobs[0].hold) job_list.job_package_map[jobs[0].id] = wrapper_job return job_list @@ -2059,12 +2044,14 @@ def prepare_run(expid, notransitive=False, start_time=None, start_after=None, "job_packages not found", 6016, str(e)) Log.debug("Processing job packages") try: - for (exp_id, package_name, job_name) in packages: + # fallback value, only affects to is_overclock + wrapper_wallclock = as_conf.experiment_data.get("CONFIG", {}).get("WRAPPERS_WALLCLOCK", "48:00") + for (exp_id, package_name, job_name, wrapper_wallclock) in packages: if package_name not in job_list.packages_dict: job_list.packages_dict[package_name] = [] job_list.packages_dict[package_name].append(job_list.get_job_by_name(job_name)) # This function, checks the stored STATUS of jobs inside wrappers. Since "wrapper status" is a memory variable. - job_list = Autosubmit.check_wrapper_stored_status(as_conf, job_list) + job_list = Autosubmit.check_wrapper_stored_status(as_conf, job_list, wrapper_wallclock) except Exception as e: raise AutosubmitCritical( "Autosubmit failed while processing job packages. This might be due to a change in your experiment configuration files after 'autosubmit create' was performed.", @@ -5750,10 +5737,15 @@ def load_job_list(expid, as_conf, notransitive=False, monitor=False, new = True) if str(rerun).lower() == "true": rerun_jobs = as_conf.get_rerun_jobs() - job_list.rerun(rerun_jobs,as_conf, monitor=monitor) + job_list.rerun(rerun_jobs, as_conf, monitor=monitor) else: job_list.remove_rerun_only_jobs(notransitive) + # Inspect -cw and Create -cw commands had issues at this point. + # Reset packed value on load so the jobs can be wrapped again. + for job in job_list.get_waiting() + job_list.get_ready(): + job.packed = False + return job_list @staticmethod diff --git a/autosubmit/job/job.py b/autosubmit/job/job.py index 32afeb78c..9dc1b8c81 100644 --- a/autosubmit/job/job.py +++ b/autosubmit/job/job.py @@ -219,7 +219,7 @@ def __init__(self, name, job_id, status, priority): #: (int) Number of failed attempts to run this job. (FAIL_COUNT) self._fail_count = 0 self.expid = name.split('_')[0] # type: str - self.parameters = None + self.parameters = dict() self._tmp_path = os.path.join( BasicConfig.LOCAL_ROOT_DIR, self.expid, BasicConfig.LOCAL_TMP_DIR) self._log_path = Path(f"{self._tmp_path}/LOG_{self.expid}") @@ -260,6 +260,7 @@ def __init__(self, name, job_id, status, priority): self.ready_date = None self.wrapper_name = None self.is_wrapper = False + self._wallclock_in_seconds = None def _adjust_new_parameters(self) -> None: """ @@ -277,6 +278,10 @@ def _adjust_new_parameters(self) -> None: if not hasattr(self, "_log_path"): # Added in 4.1.12 self._log_path = Path(f"{self._tmp_path}/LOG_{self.expid}") + if not hasattr(self, "_wallclock_in_seconds"): # Added in 4.1.12 + self._wallclock_in_seconds = None + self.wallclock = self.wallclock # also sets the wallclock in seconds + def _init_runtime_parameters(self): # hetjobs self.het = {'HETSIZE': 0} @@ -295,11 +300,16 @@ def _init_runtime_parameters(self): self.stat_file = self.script_name[:-4] + "_STAT_0" + @property + def wallclock_in_seconds(self): + return self._wallclock_in_seconds + @property @autosubmit_parameter(name='x11') def x11(self): """Whether to use X11 forwarding""" return self._x11 + @x11.setter def x11(self, value): self._x11 = value @@ -436,6 +446,16 @@ def wallclock(self): def wallclock(self, value): self._wallclock = value + if not self._wallclock_in_seconds or self.status not in [Status.RUNNING, Status.QUEUING, Status.SUBMITTED]: + # Should always take the max_wallclock set in the platform, this is set as fallback + # (and local platform doesn't have a max_wallclock defined) + if not self._wallclock or self._wallclock == "00:00": + self._wallclock = self.parameters.get("CONFIG.JOB_WALLCLOCK", "24:00") + Log.warning(f"No wallclock is set for this job. Default to {self._wallclock}. " + "You can change this value in CONFIG.WALLCLOCK") + wallclock_parsed = self.parse_time(self._wallclock) + self._wallclock_in_seconds = self._time_in_seconds_and_margin(wallclock_parsed) + @property @autosubmit_parameter(name='hyperthreading') def hyperthreading(self): @@ -718,7 +738,7 @@ def platform(self): """ Returns the platform to be used by the job. Chooses between serial and parallel platforms - :return HPCPlatform object for the job to use + :return: HPCPlatform object for the job to use :rtype: HPCPlatform """ if self.is_serial and self._platform: @@ -1273,49 +1293,59 @@ def retrieve_logfiles(self, platform: Any, raise_error: bool = False) -> Dict[st Log.result(f"{platform.name}(log_recovery) Successfully recovered log for job '{self.name}' and retry '{self.fail_count}'.") - def parse_time(self,wallclock): + def _max_possible_wallclock(self): + if self.platform and self.platform.max_wallclock: + wallclock = self.parse_time(self.platform.max_wallclock) + if wallclock: + return int(wallclock.total_seconds()) + return None + + def _time_in_seconds_and_margin(self, wallclock: datetime.timedelta) -> int: + """ + Calculate the total wallclock time in seconds and the wallclock time with a margin. + + This method increases the given wallclock time by 30%. + It then converts the total wallclock time to seconds and returns both the total + wallclock time in seconds and the wallclock time with the margin as a timedelta. + + :param wallclock: The original wallclock time. + :type wallclock: datetime.timedelta + + :return int: The total wallclock time in seconds. + """ + total = int(wallclock.total_seconds() * 1.30) + total_platform = self._max_possible_wallclock() + if not total_platform: + total_platform = total + if total > total_platform: + Log.warning(f"Job {self.name} has a wallclock time '{total} seconds' higher than the maximum allowed by the platform '{total_platform} seconds' " + f"Setting wallclock time to the maximum allowed by the platform.") + total = total_platform + wallclock_delta = datetime.timedelta(seconds=total) + return int(wallclock_delta.total_seconds()) + + @staticmethod + def parse_time(wallclock): regex = re.compile(r'(((?P\d+):)((?P\d+)))(:(?P\d+))?') parts = regex.match(wallclock) if not parts: - return + return None parts = parts.groupdict() - if int(parts['hours']) > 0 : - format_ = "hour" - else: - format_ = "minute" time_params = {} for name, param in parts.items(): if param: time_params[name] = int(param) - return datetime.timedelta(**time_params),format_ + return datetime.timedelta(**time_params) - # Duplicated for wrappers and jobs to fix in 4.0.0 - def is_over_wallclock(self, start_time, wallclock): + # TODO Duplicated for wrappers and jobs to fix in 4.1.X but in wrappers is called _is_over_wallclock for unknown reasons + def is_over_wallclock(self): """ Check if the job is over the wallclock time, it is an alternative method to avoid platform issues - :param start_time: - :param wallclock: :return: """ - elapsed = datetime.datetime.now() - start_time - wallclock,time_format = self.parse_time(wallclock) - if time_format == "hour": - total = wallclock.days * 24 + wallclock.seconds / 60 / 60 - else: - total = wallclock.days * 24 + wallclock.seconds / 60 - total = total * 1.30 # in this case we only want to avoid slurm issues so the time is increased by 50% - if time_format == "hour": - hour = int(total) - minute = int((total - int(total)) * 60.0) - second = int(((total - int(total)) * 60 - - int((total - int(total)) * 60.0)) * 60.0) - wallclock_delta = datetime.timedelta(hours=hour, minutes=minute, - seconds=second) - else: - minute = int(total) - second = int((total - int(total)) * 60.0) - wallclock_delta = datetime.timedelta(minutes=minute, seconds=second) - if elapsed > wallclock_delta: + elapsed = datetime.datetime.now() - self.start_time + if int(elapsed.total_seconds()) > self.wallclock_in_seconds: + Log.warning(f"Job {self.name} is over wallclock time, Autosubmit will check if it is completed") return True return False @@ -2052,8 +2082,8 @@ def update_parameters(self, as_conf, parameters, as_conf.reload() self._adjust_new_parameters() self._init_runtime_parameters() - if hasattr(self, "start_time"): - self.start_time = time.time() + if not hasattr(self, "start_time"): + self.start_time = datetime.datetime.now() # Parameters that affect to all the rest of parameters self.update_dict_parameters(as_conf) parameters = parameters.copy() @@ -2610,7 +2640,7 @@ def __init__(self, name, job_id, status, priority, job_list, total_wallclock, nu self.failed = False self.job_list = job_list # divide jobs in dictionary by state? - self.wallclock = total_wallclock + self.wallclock = total_wallclock # Now it is reloaded after a run -> stop -> run self.num_processors = num_processors self.running_jobs_start = OrderedDict() self._platform = platform diff --git a/autosubmit/job/job_list.py b/autosubmit/job/job_list.py index 58f3fbf47..1541f9203 100644 --- a/autosubmit/job/job_list.py +++ b/autosubmit/job/job_list.py @@ -3034,8 +3034,7 @@ def save_wrappers(self, packages_to_save, failed_packages, as_conf, packages_per self.job_package_map[package.jobs[0].id] = wrapper_job if isinstance(package, JobPackageThread): # Saving only when it is a real multi job package - packages_persistence.save( - package.name, package.jobs, package._expid, inspect) + packages_persistence.save(package, inspect) # Need to store the wallclock for the is_overwallclock function def check_scripts(self, as_conf): """ diff --git a/autosubmit/job/job_package_persistence.py b/autosubmit/job/job_package_persistence.py index 91411274e..766147f77 100644 --- a/autosubmit/job/job_package_persistence.py +++ b/autosubmit/job/job_package_persistence.py @@ -18,6 +18,8 @@ # along with Autosubmit. If not, see . from autosubmit.database.db_manager import DbManager +from log.log import AutosubmitCritical +from typing import Any, List class JobPackagePersistence(object): @@ -35,24 +37,35 @@ class JobPackagePersistence(object): VERSION = 1 JOB_PACKAGES_TABLE = 'job_package' WRAPPER_JOB_PACKAGES_TABLE = 'wrapper_job_package' - TABLE_FIELDS = ['exp_id', 'package_name', 'job_name'] + TABLE_FIELDS = ['exp_id', 'package_name', 'job_name', 'wallclock' ] # new field, needs a new autosubmit create def __init__(self, persistence_path, persistence_file): self.db_manager = DbManager(persistence_path, persistence_file, self.VERSION) self.db_manager.create_table(self.JOB_PACKAGES_TABLE, self.TABLE_FIELDS) self.db_manager.create_table(self.WRAPPER_JOB_PACKAGES_TABLE, self.TABLE_FIELDS) - def load(self,wrapper=False): + + def load(self, wrapper=False) -> List[Any]: """ Loads package of jobs from a database :param: wrapper: boolean - :return: dictionary of jobs per package - - + :return: list of jobs per package """ if not wrapper: - return self.db_manager.select_all(self.JOB_PACKAGES_TABLE) + results = self.db_manager.select_all(self.JOB_PACKAGES_TABLE) else: - return self.db_manager.select_all(self.WRAPPER_JOB_PACKAGES_TABLE) + results = self.db_manager.select_all(self.WRAPPER_JOB_PACKAGES_TABLE) + if len(results) > 0: + # ['exp_id', 'package_name', 'job_name', 'wallclock'] wallclock is the new addition + for wrapper in results: + if len(wrapper) != 4: + # New field in the db, so not compatible if the wrapper package is not reset + # (done in the create function) + raise AutosubmitCritical("Error while loading the wrappers. The current wrappers have a different " + "amount of fields than the expected. Possibly due to using different " + "versions of Autosubmit in the same experiment. Please, run " + "'autosubmit create -f ' to fix this issue.") + return results + def reset(self): """ Loads package of jobs from a database @@ -60,26 +73,24 @@ def reset(self): """ self.db_manager.drop_table(self.WRAPPER_JOB_PACKAGES_TABLE) self.db_manager.create_table(self.WRAPPER_JOB_PACKAGES_TABLE, self.TABLE_FIELDS) - def save(self, package_name, jobs, exp_id,wrapper=False): + + def save(self, package, preview_wrappers=False): """ Persists a job list in a database - :param package_name: str - :param jobs: list of jobs - :param exp_id: str - :param wrapper: boolean - - + :param package: all wrapper attributes + :param preview_wrappers: boolean """ #self._reset_table() job_packages_data = [] - for job in jobs: - job_packages_data += [(exp_id, package_name, job.name)] + for job in package.jobs: + job_packages_data += [(package._expid, package.name, job.name, package._wallclock)] - if wrapper: + if preview_wrappers: self.db_manager.insertMany(self.WRAPPER_JOB_PACKAGES_TABLE, job_packages_data) else: self.db_manager.insertMany(self.JOB_PACKAGES_TABLE, job_packages_data) self.db_manager.insertMany(self.WRAPPER_JOB_PACKAGES_TABLE, job_packages_data) + def reset_table(self,wrappers=False): """ Drops and recreates the database @@ -91,4 +102,4 @@ def reset_table(self,wrappers=False): self.db_manager.drop_table(self.JOB_PACKAGES_TABLE) self.db_manager.create_table(self.JOB_PACKAGES_TABLE, self.TABLE_FIELDS) self.db_manager.drop_table(self.WRAPPER_JOB_PACKAGES_TABLE) - self.db_manager.create_table(self.WRAPPER_JOB_PACKAGES_TABLE, self.TABLE_FIELDS) \ No newline at end of file + self.db_manager.create_table(self.WRAPPER_JOB_PACKAGES_TABLE, self.TABLE_FIELDS) diff --git a/autosubmit/job/job_packages.py b/autosubmit/job/job_packages.py index 5dc0d38df..8ef5b3c74 100644 --- a/autosubmit/job/job_packages.py +++ b/autosubmit/job/job_packages.py @@ -741,7 +741,7 @@ def __init__(self, jobs, dependency=None,configuration=None,wrapper_section="WRA self._wallclock = sum_str_hours(self._wallclock, job.wallclock) self._name = f"{self._expid}_{self.FILE_PREFIX}_{jobs_in_wrapper_str(configuration,self.current_wrapper_section)}_{str(int(time.time())) + str(random.randint(1, 10000))}_{self._num_processors}_{len(self._jobs)}" - def parse_time(self): + def parse_time(self): # TODO: Remove this function and use the one in the Job class or move the one in the job class into utils format_ = "minute" # noinspection Annotator regex = re.compile(r'(((?P\d+):)((?P\d+)))(:(?P\d+))?') @@ -759,7 +759,7 @@ def parse_time(self): time_params[name] = int(param) return timedelta(**time_params),format_ def _common_script_content(self): - if self.jobs[0].wrapper_type == "vertical": + if self.jobs[0].wrapper_type == "vertical": # TODO: normalize this logic to be the same as the one in the Job class wallclock,format_ = self.parse_time() original_wallclock_to_seconds = wallclock.days * 86400.0 + wallclock.seconds diff --git a/autosubmit/monitor/monitor.py b/autosubmit/monitor/monitor.py index ccdfffb02..8d9389f2e 100644 --- a/autosubmit/monitor/monitor.py +++ b/autosubmit/monitor/monitor.py @@ -195,7 +195,7 @@ def create_tree_list(self, expid, joblist, packages, groups, hide_groups=False): jobs_packages_dict = dict() if packages is not None and len(str(packages)) > 0: - for (exp_id, package_name, job_name) in packages: + for (exp_id, package_name, job_name, wallclock) in packages: jobs_packages_dict[job_name] = package_name packages_subgraphs_dict = dict() diff --git a/autosubmit/platforms/locplatform.py b/autosubmit/platforms/locplatform.py index 94e6c7aa2..ba3ec7bbf 100644 --- a/autosubmit/platforms/locplatform.py +++ b/autosubmit/platforms/locplatform.py @@ -104,8 +104,7 @@ def jobs_in_queue(self): def get_submit_cmd(self, job_script, job, hold=False, export=""): if job: # Not intuitive at all, but if it is not a job, it is a wrapper - wallclock = self.parse_time(job.wallclock) - seconds = int(wallclock.days * 86400 + wallclock.seconds * 60) + seconds = job.wallclock_in_seconds else: # TODO for another branch this, it is to add a timeout to the wrapped jobs even if the wallclock is 0, default to 2 days seconds = 60*60*24*2 diff --git a/autosubmit/platforms/paramiko_platform.py b/autosubmit/platforms/paramiko_platform.py index eba3f9f95..2f8396212 100644 --- a/autosubmit/platforms/paramiko_platform.py +++ b/autosubmit/platforms/paramiko_platform.py @@ -586,6 +586,24 @@ def parse_estimated_time(self, output): """ raise NotImplementedError + def job_is_over_wallclock(self, job, job_status, cancel=False): + if job.is_over_wallclock(): + try: + job.platform.get_completed_files(job.name) + job_status = job.check_completion(over_wallclock=True) + except Exception as e: + job_status = Status.FAILED + Log.debug(f"Unexpected error checking completed files for a job over wallclock: {str(e)}") + + if cancel and job_status is Status.FAILED: + try: + if self.cancel_cmd is not None: + Log.warning(f"Job {job.id} is over wallclock, cancelling job") + job.platform.send_command(self.cancel_cmd + " " + str(job.id)) + except Exception as e: + Log.debug(f"Error cancelling job {job.id}: {str(e)}") + return job_status + def check_job(self, job, default_status=Status.COMPLETED, retries=5, submit_hold_check=False, is_wrapper=False): """ Checks job running status @@ -654,12 +672,7 @@ def check_job(self, job, default_status=Status.COMPLETED, retries=5, submit_hold if job.wallclock == "00:00" or job.wallclock is None: wallclock = job.platform.max_wallclock if wallclock != "00:00" and wallclock != "00:00:00" and wallclock != "": - if job.is_over_wallclock(job.start_time,wallclock): - try: - job.platform.get_completed_files(job.name) - job_status = job.check_completion(over_wallclock=True) - except Exception as e: - job_status = Status.FAILED + job_status = self.job_is_over_wallclock(job, job_status, cancel=False) elif job_status in self.job_status['QUEUING'] and (not job.hold or job.hold.lower() != "true"): job_status = Status.QUEUING elif job_status in self.job_status['QUEUING'] and (job.hold or job.hold.lower() == "true"): @@ -786,18 +799,7 @@ def check_Alljobs(self, job_list, as_conf, retries=5): if job.wallclock == "00:00": wallclock = job.platform.max_wallclock if wallclock != "00:00" and wallclock != "00:00:00" and wallclock != "": - if job.is_over_wallclock(job.start_time,wallclock): - try: - job.platform.get_completed_files(job.name) - job_status = job.check_completion(over_wallclock=True) - if job_status is Status.FAILED: - try: - if self.cancel_cmd is not None: - job.platform.send_command(self.cancel_cmd + " " + str(job.id)) - except: - pass - except: - job_status = Status.FAILED + job_status = self.job_is_over_wallclock(job, job_status, cancel=True) if job_status in self.job_status['COMPLETED']: job_status = Status.COMPLETED elif job_status in self.job_status['RUNNING']: @@ -1381,18 +1383,6 @@ def get_header(self, job): header = header.replace( '%HYPERTHREADING_DIRECTIVE%', self.header.get_hyperthreading_directive(job)) return header - def parse_time(self,wallclock): - # noinspection Annotator - regex = re.compile(r'(((?P\d+):)((?P\d+)))(:(?P\d+))?') - parts = regex.match(wallclock) - if not parts: - return - parts = parts.groupdict() - time_params = {} - for name, param in parts.items(): - if param: - time_params[name] = int(param) - return timedelta(**time_params) def closeConnection(self): # Ensure to delete all references to the ssh connection, so that it frees all the file descriptors diff --git a/autosubmit/platforms/platform.py b/autosubmit/platforms/platform.py index d8f97b299..0448f5e24 100644 --- a/autosubmit/platforms/platform.py +++ b/autosubmit/platforms/platform.py @@ -334,8 +334,7 @@ def submit_ready_jobs(self, as_conf, job_list, platforms_to_test, packages_persi package._wallclock, package._num_processors, package.platform, as_conf, hold) job_list.job_package_map[package.jobs[0].id] = wrapper_job - packages_persistence.save( - package.name, package.jobs, package._expid, inspect) + packages_persistence.save(package, inspect) for innerJob in package._jobs: any_job_submitted = True # Setting status to COMPLETED, so it does not get stuck in the loop that calls this function diff --git a/autosubmit/platforms/psplatform.py b/autosubmit/platforms/psplatform.py index 2a5fe05d5..edb97f8bb 100644 --- a/autosubmit/platforms/psplatform.py +++ b/autosubmit/platforms/psplatform.py @@ -96,13 +96,11 @@ def jobs_in_queue(self): return [int(element.firstChild.nodeValue) for element in jobs_xml] def get_submit_cmd(self, job_script, job, hold=False, export=""): - wallclock = self.parse_time(job.wallclock) - seconds = int(wallclock.days * 86400 + wallclock.seconds * 60) if export == "none" or export == "None" or export is None or export == "": export = "" else: export += " ; " - return self.get_call(job_script, job, export = export, timeout=seconds) + return self.get_call(job_script, job, export=export, timeout=job.wallclock_in_seconds) def get_checkjob_cmd(self, job_id): return self.get_pscall(job_id) diff --git a/docs/source/userguide/configure/develop_a_project.rst b/docs/source/userguide/configure/develop_a_project.rst index 132d800af..90c4ec164 100644 --- a/docs/source/userguide/configure/develop_a_project.rst +++ b/docs/source/userguide/configure/develop_a_project.rst @@ -137,14 +137,16 @@ Autosubmit configuration # This parameter is used to enable the use of threads in autosubmit for the wrappers. # Default False ENABLE_WRAPPER_THREADS: False OUTPUT:pdf - # wrapper definition - wrappers: - wrapper_1_v_example: - TYPE: Vertical - JOBS_IN_WRAPPER: sim - wrapper_2_h_example: - TYPE: Horizontal - JOBS_IN_WRAPPER: da + WRAPPERS_WALLCLOCK: 48:00 # Default max_wallclock for wrappers before getting killed + JOB_WALLCLOCK: 24:00 # Default max_wallclock for jobs before getting killed + # wrapper definition + wrappers: + wrapper_1_v_example: + TYPE: Vertical + JOBS_IN_WRAPPER: sim + wrapper_2_h_example: + TYPE: Horizontal + JOBS_IN_WRAPPER: da Jobs configuration ================== diff --git a/test/unit/conftest.py b/test/unit/conftest.py index f01a8cb84..722c1c786 100644 --- a/test/unit/conftest.py +++ b/test/unit/conftest.py @@ -182,3 +182,23 @@ def finalizer() -> None: request.addfinalizer(finalizer) return _create_autosubmit_config + + +@pytest.fixture +def prepare_basic_config(tmpdir): + basic_conf = BasicConfig() + BasicConfig.DB_DIR = (tmpdir / "exp_root") + BasicConfig.DB_FILE = "debug.db" + BasicConfig.LOCAL_ROOT_DIR = (tmpdir / "exp_root") + BasicConfig.LOCAL_TMP_DIR = "tmp" + BasicConfig.LOCAL_ASLOG_DIR = "ASLOGS" + BasicConfig.LOCAL_PROJ_DIR = "proj" + BasicConfig.DEFAULT_PLATFORMS_CONF = "" + BasicConfig.CUSTOM_PLATFORMS_PATH = "" + BasicConfig.DEFAULT_JOBS_CONF = "" + BasicConfig.SMTP_SERVER = "" + BasicConfig.MAIL_FROM = "" + BasicConfig.ALLOWED_HOSTS = "" + BasicConfig.DENIED_HOSTS = "" + BasicConfig.CONFIG_FILE_FOUND = False + return basic_conf diff --git a/test/unit/provenance/test_rocrate.py b/test/unit/provenance/test_rocrate.py index a2a6544b0..cc57cd946 100644 --- a/test/unit/provenance/test_rocrate.py +++ b/test/unit/provenance/test_rocrate.py @@ -563,7 +563,8 @@ def test_rocrate_main( job2.processors = '1' mocked_job_list.get_job_list.return_value = [job1, job2] - + mocked_job_list.get_ready.return_value = [] # Mock due the new addition in the job_list.load() + mocked_job_list.get_waiting.return_value = [] # Mocked due the new addition in the job_list.load() autosubmit = Autosubmit() r = autosubmit.rocrate(self.expid, path=Path(temp_dir)) self.assertTrue(r) @@ -748,4 +749,3 @@ def test_no_duplicate_ids( self.assertIsNotNone(crate) data_entities_ids = [data_entity['@id'] for data_entity in crate.data_entities] self.assertEqual(len(data_entities_ids), len(set(data_entities_ids)), f'Duplicate IDs found in the RO-Crate data entities: {str(data_entities_ids)}') - diff --git a/test/unit/test_job.py b/test/unit/test_job.py index c56d8cd54..8ac68d7a4 100644 --- a/test/unit/test_job.py +++ b/test/unit/test_job.py @@ -1100,6 +1100,7 @@ def test_job_script_checking_contains_the_right_variables(self): dummy_platform = MagicMock() dummy_platform.serial_platform = dummy_serial_platform dummy_platform.name = 'dummy_platform' + dummy_platform.max_wallclock = '00:55' self.as_conf.substitute_dynamic_variables = MagicMock() default = {'d': '%d%', 'd_': '%d_%', 'Y': '%Y%', 'Y_': '%Y_%', diff --git a/test/unit/test_job_graph.py b/test/unit/test_job_graph.py index 579aee5ad..b02e74fcf 100644 --- a/test/unit/test_job_graph.py +++ b/test/unit/test_job_graph.py @@ -381,13 +381,13 @@ def test_synchronize_date(self): self.assertListEqual(sorted(list(subgraph['edges'].keys())), sorted(edges)) def test_wrapper_package(self): - packages = [('expid', 'package_d1_m1_SIM', 'expid_d1_m1_1_SIM'), ('expid', 'package_d1_m1_SIM', 'expid_d1_m1_2_SIM'), - ('expid', 'package_d2_m2_SIM', 'expid_d2_m2_1_SIM'), ('expid', 'package_d2_m2_SIM', 'expid_d2_m2_2_SIM')] + packages = [('expid', 'package_d1_m1_SIM', 'expid_d1_m1_1_SIM', "02:00"), ('expid', 'package_d1_m1_SIM', 'expid_d1_m1_2_SIM', "02:00"), + ('expid', 'package_d2_m2_SIM', 'expid_d2_m2_1_SIM', "02:00"), ('expid', 'package_d2_m2_SIM', 'expid_d2_m2_2_SIM', "02:00")] monitor = Monitor() graph = monitor.create_tree_list(self.experiment_id, self.job_list.get_job_list(), packages, dict()) self.assertFalse(graph.obj_dict['strict']) - for (expid, package, job_name) in packages: + for (expid, package, job_name, _) in packages: self.assertIn('cluster_'+package, graph.obj_dict['subgraphs']) def test_synchronize_member_group_member(self): @@ -895,16 +895,16 @@ def test_wrapper_and_groups(self): ("expid_d2_m2_1_SIM", "d2_m2_2") ] - packages = [('expid', 'package_d1_m1_SIM', 'expid_d1_m1_1_SIM'), - ('expid', 'package_d1_m1_SIM', 'expid_d1_m1_2_SIM'), - ('expid', 'package_d2_m2_SIM', 'expid_d2_m2_1_SIM'), - ('expid', 'package_d2_m2_SIM', 'expid_d2_m2_2_SIM')] + packages = [('expid', 'package_d1_m1_SIM', 'expid_d1_m1_1_SIM', "02:00"), + ('expid', 'package_d1_m1_SIM', 'expid_d1_m1_2_SIM', "02:00"), + ('expid', 'package_d2_m2_SIM', 'expid_d2_m2_1_SIM', "02:00"), + ('expid', 'package_d2_m2_SIM', 'expid_d2_m2_2_SIM', "02:00")] monitor = Monitor() graph = monitor.create_tree_list(self.experiment_id, self.job_list.get_job_list(), packages, groups_dict) self.assertTrue(graph.obj_dict['strict']) - for (expid, package, job_name) in packages: + for (expid, package, job_name, _) in packages: if package != 'package_d2_m2_SIM': self.assertNotIn('cluster_' + package, graph.obj_dict['subgraphs']) else: @@ -937,4 +937,4 @@ def __init__(self): LOCAL_TMP_DIR = '/dummy/local/temp/dir' LOCAL_PROJ_DIR = '/dummy/local/proj/dir' DEFAULT_PLATFORMS_CONF = '' - DEFAULT_JOBS_CONF = '' \ No newline at end of file + DEFAULT_JOBS_CONF = '' diff --git a/test/unit/test_job_package.py b/test/unit/test_job_package.py index 8058f0304..599f4b95a 100644 --- a/test/unit/test_job_package.py +++ b/test/unit/test_job_package.py @@ -97,6 +97,7 @@ def setUp(self): self.platform.queue = "debug" self.platform.partition = "debug" self.platform.serial_platform = self.platform + self.platform.serial_platform.max_wallclock = '24:00' self.platform.serial_queue = "debug-serial" self.platform.serial_partition = "debug-serial" self.jobs = [Job('dummy1', 0, Status.READY, 0), @@ -211,6 +212,7 @@ class MockAsConf: } return MockAsConf() + def test_jobs_in_wrapper_str(mock_as_conf): # Arrange current_wrapper = "current_wrapper" diff --git a/test/unit/test_job_pytest.py b/test/unit/test_job_pytest.py index ce7bbfb34..0b6c2ebe1 100644 --- a/test/unit/test_job_pytest.py +++ b/test/unit/test_job_pytest.py @@ -165,12 +165,15 @@ def test_update_parameters_attributes(autosubmit_config, experiment_data, attrib def test_adjust_new_parameters(test_packed): job = Job('dummy', '1', 0, 1) stored_log_path = job._log_path + job.wallclock = "00:01" del job.is_wrapper del job.wrapper_name + del job._wallclock_in_seconds del job._log_path job.packed = test_packed job._adjust_new_parameters() assert job.is_wrapper == test_packed + assert int(job._wallclock_in_seconds) == int(60*1.3) if test_packed: assert job.wrapper_name == "wrapped" else: @@ -245,3 +248,35 @@ def test_custom_directives(tmpdir, custom_directives, test_type, result_by_lines for directive in result_by_lines: pattern = r'^\s*' + re.escape(directive) + r'\s*$' # Match Start line, match directive, match end line assert re.search(pattern, template_content, re.MULTILINE) is not None + + +@pytest.mark.parametrize('experiment_data', [( + { + 'JOBS': { + 'RANDOM-SECTION': { + 'FILE': "test.sh", + 'PLATFORM': 'DUMMY_PLATFORM', + 'TEST': "rng", + }, + }, + 'PLATFORMS': { + 'dummy_platform': { + 'type': 'ps', + 'whatever': 'dummy_value', + 'whatever2': 'dummy_value2', + 'CUSTOM_DIRECTIVES': ['$SBATCH directive1', '$SBATCH directive2'], + }, + }, + 'ROOTDIR': "asd", + 'LOCAL_TMP_DIR': "asd", + 'LOCAL_ROOT_DIR': "asd", + 'LOCAL_ASLOG_DIR': "asd", + } +)], ids=["Simple job"]) +def test_no_start_time(autosubmit_config, experiment_data): + job, as_conf = create_job_and_update_parameters(autosubmit_config, experiment_data) + del job.start_time + as_conf.force_load = False + as_conf.data_changed = False + job.update_parameters(as_conf, job.parameters) + assert isinstance(job.start_time, datetime) diff --git a/test/unit/test_overwallclock.py b/test/unit/test_overwallclock.py new file mode 100644 index 000000000..56f96f365 --- /dev/null +++ b/test/unit/test_overwallclock.py @@ -0,0 +1,165 @@ +from datetime import datetime, timedelta + +import pytest + +from autosubmit.autosubmit import Autosubmit +from autosubmit.job.job import Job +from autosubmit.job.job_common import Status +from autosubmit.job.job_list import JobList +from autosubmit.job.job_list_persistence import JobListPersistencePkl +from autosubmit.job.job_packages import JobPackageSimple, JobPackageVertical, JobPackageHorizontal +from autosubmit.platforms.psplatform import PsPlatform +from autosubmit.platforms.slurmplatform import SlurmPlatform +from autosubmitconfigparser.config.yamlparser import YAMLParserFactory + + +@pytest.fixture +def setup_as_conf(autosubmit_config, tmpdir, prepare_basic_config): + exp_data = { + "WRAPPERS": { + "WRAPPERS": { + "JOBS_IN_WRAPPER": "dummysection" + } + }, + "LOCAL_ROOT_DIR": f"{tmpdir.strpath}", + "LOCAL_TMP_DIR": f'{tmpdir.strpath}', + "LOCAL_ASLOG_DIR": f"{tmpdir.strpath}", + "PLATFORMS": { + "PYTEST-UNSUPPORTED": { + "TYPE": "unknown", + "host": "", + "user": "", + "project": "", + "scratch_dir": "", + "MAX_WALLCLOCK": "", + "DISABLE_RECOVERY_THREADS": True + } + }, + + } + as_conf = autosubmit_config("random-id", exp_data) + return as_conf + + +@pytest.fixture +def new_job_list(setup_as_conf, tmpdir, prepare_basic_config): + job_list = JobList("random-id", prepare_basic_config, YAMLParserFactory(), + JobListPersistencePkl(), setup_as_conf) + + return job_list + + +@pytest.fixture +def new_platform_mock(mocker, tmpdir): + dummy_platform = mocker.MagicMock(autospec=SlurmPlatform) + # Add here as many attributes as needed + dummy_platform.name = 'dummy_platform' + dummy_platform.max_wallclock = "02:00" + + # When proc = 1, the platform used will be serial, so just nest the defined platform. + dummy_platform.serial_platform = dummy_platform + return dummy_platform + + +def new_packages(as_conf, dummy_jobs): + packages = [ + JobPackageSimple([dummy_jobs[0]]), + JobPackageVertical(dummy_jobs, configuration=as_conf), + JobPackageHorizontal(dummy_jobs, configuration=as_conf), + ] + for package in packages: + if not isinstance(package, JobPackageSimple): + package._name = "wrapped" + return packages + + +def setup_jobs(dummy_jobs, new_platform_mock): + for job in dummy_jobs: + job._platform = new_platform_mock + job.processors = 2 + job.section = "dummysection" + job._init_runtime_parameters() + job.wallclock = "00:01" + job.start_time = datetime.now() - timedelta(minutes=1) + + +@pytest.mark.parametrize( + "initial_status, expected_status", + [ + (Status.SUBMITTED, Status.SUBMITTED), + (Status.QUEUING, Status.QUEUING), + (Status.RUNNING, Status.RUNNING), + (Status.FAILED, Status.FAILED), + (Status.COMPLETED, Status.COMPLETED), + (Status.HELD, Status.HELD), + (Status.UNKNOWN, Status.UNKNOWN), + ], + ids=["Submitted", "Queuing", "Running", "Failed", "Completed", "Held", "No packages"] +) +def test_check_wrapper_stored_status(setup_as_conf, new_job_list, new_platform_mock, initial_status, expected_status): + dummy_jobs = [Job("dummy-1", 1, initial_status, 0), Job("dummy-2", 2, initial_status, 0), Job("dummy-3", 3, initial_status, 0)] + setup_jobs(dummy_jobs, new_platform_mock) + new_job_list.jobs = dummy_jobs + if dummy_jobs[0].status != Status.UNKNOWN: + new_job_list.packages_dict = {"dummy_wrapper": dummy_jobs} + new_job_list = Autosubmit.check_wrapper_stored_status(setup_as_conf, new_job_list, "03:30") + assert new_job_list is not None + if dummy_jobs[0].status != Status.UNKNOWN: + assert new_job_list.job_package_map[dummy_jobs[0].id].status == expected_status + + +def test_parse_time(new_platform_mock): + job = Job("dummy-1", 1, Status.SUBMITTED, 0) + setup_jobs([job], new_platform_mock) + assert job.parse_time("0000") is None + assert job.parse_time("00:01") == timedelta(seconds=60) + + +def test_is_over_wallclock(new_platform_mock): + job = Job("dummy-1", 1, Status.SUBMITTED, 0) + setup_jobs([job], new_platform_mock) + job.wallclock = "00:01" + assert job.is_over_wallclock() is False + job.start_time = datetime.now() - timedelta(minutes=2) + assert job.is_over_wallclock() is True + + +@pytest.mark.parametrize( + "platform_class, platform_name", + [(SlurmPlatform, "Slurm"), (PsPlatform, "PS"), (PsPlatform, "PJM")], + ids=["SlurmPlatform", "PsPlatform", "PjmPlatform"] +) +def test_platform_job_is_over_wallclock(setup_as_conf, new_platform_mock, platform_class, platform_name, mocker): + platform_instance = platform_class("dummy", f"{platform_name}-dummy", setup_as_conf.experiment_data) + job = Job("dummy-1", 1, Status.RUNNING, 0) + setup_jobs([job], platform_instance) + job.wallclock = "00:01" + job_status = platform_instance.job_is_over_wallclock(job, Status.RUNNING) + assert job_status == Status.RUNNING + job.start_time = datetime.now() - timedelta(minutes=2) + job_status = platform_instance.job_is_over_wallclock(job, Status.RUNNING) + assert job_status == Status.FAILED + # check platform_instance is called + platform_instance.send_command = mocker.MagicMock() + job_status = platform_instance.job_is_over_wallclock(job, Status.RUNNING, True) + assert job_status == Status.FAILED + platform_instance.send_command.assert_called_once() + platform_instance.cancel_cmd = None + platform_instance.send_command = mocker.MagicMock() + platform_instance.job_is_over_wallclock(job, Status.RUNNING, True) + platform_instance.send_command.assert_not_called() + + +@pytest.mark.parametrize( + "platform_class, platform_name", + [(SlurmPlatform, "Slurm"), (PsPlatform, "PS"), (PsPlatform, "PJM")], + ids=["SlurmPlatform", "PsPlatform", "PjmPlatform"] +) +def test_platform_job_is_over_wallclock_force_failure(setup_as_conf, new_platform_mock, platform_class, platform_name, mocker): + platform_instance = platform_class("dummy", f"{platform_name}-dummy", setup_as_conf.experiment_data) + job = Job("dummy-1", 1, Status.RUNNING, 0) + setup_jobs([job], platform_instance) + job.start_time = datetime.now() - timedelta(minutes=2) + job.platform.get_completed_files = mocker.MagicMock(side_effect=Exception("Error")) + job_status = platform_instance.job_is_over_wallclock(job, Status.RUNNING, True) + assert job_status == Status.FAILED diff --git a/test/unit/test_packages.py b/test/unit/test_packages.py index 8b3852511..a795d5694 100644 --- a/test/unit/test_packages.py +++ b/test/unit/test_packages.py @@ -16,10 +16,12 @@ def create_packages(mocker, autosubmit_config): } as_conf = autosubmit_config("a000", exp_data) jobs = [Job("dummy-1", 1, Status.SUBMITTED, 0), Job("dummy-2", 2, Status.SUBMITTED, 0), Job("dummy-3", 3, Status.SUBMITTED, 0)] + platform = mocker.MagicMock() + platform.name = 'dummy' + platform.serial_platform = mock.MagicMock() + platform.serial_platform.max_wallclock = '24:00' for job in jobs: - job._platform = mocker.MagicMock() - job._platform.name = "dummy" - job.platform_name = "dummy" + job._platform = platform job.processors = 2 job.section = "dummysection" job._init_runtime_parameters() diff --git a/test/unit/test_packages_persistence.py b/test/unit/test_packages_persistence.py new file mode 100644 index 000000000..ad21519d7 --- /dev/null +++ b/test/unit/test_packages_persistence.py @@ -0,0 +1,20 @@ +import pytest + +from autosubmit.autosubmit import Autosubmit +from autosubmit.job.job_package_persistence import JobPackagePersistence +from log.log import AutosubmitCritical + + +def test_load(mocker): + """ + Loads package of jobs from a database + :param: wrapper: boolean + :return: list of jobs per package + """ + mocker.patch('autosubmit.database.db_manager.DbManager.select_all').return_value = [['random-id"', 'vertical-wrapper', 'dummy-job', '02:00']] + mocker.patch('sqlite3.connect').return_value = mocker.MagicMock() + job_package_persistence = JobPackagePersistence('dummy/path', 'dummy/file') + assert job_package_persistence.load(wrapper=True) == [['random-id"', 'vertical-wrapper', 'dummy-job', '02:00']] + mocker.patch('autosubmit.database.db_manager.DbManager.select_all').return_value = [['random-id"', 'vertical-wrapper', 'dummy-job']] + with pytest.raises(AutosubmitCritical): + job_package_persistence.load(wrapper=True) diff --git a/test/unit/test_paramiko_platform.py b/test/unit/test_paramiko_platform.py index ec602e5cc..9383ddc18 100644 --- a/test/unit/test_paramiko_platform.py +++ b/test/unit/test_paramiko_platform.py @@ -4,6 +4,7 @@ from pathlib import Path from autosubmit.job.job_common import Status +from autosubmit.job.job import Job from autosubmit.platforms.paramiko_platform import ParamikoPlatform from autosubmit.platforms.psplatform import PsPlatform from log.log import AutosubmitError @@ -145,3 +146,19 @@ def test_send_file(mocker, ps_platform, filename, check): platform.send_command = mocker.Mock() platform.send_file(filename) assert check == (remote_dir / filename).exists() + + +def test_ps_get_submit_cmd(ps_platform): + platform, _ = ps_platform + job = Job('TEST', 'TEST', Status.WAITING, 1) + job.wallclock = '00:01' + job.processors = 1 + job.section = 'dummysection' + job.platform_name = 'pytest-ps' + job.platform = platform + job.script_name = "echo hello world" + job.fail_count = 0 + command = platform.get_submit_cmd(job.script_name, job) + assert job.wallclock_in_seconds == 60 * 1.3 + assert f"{job.script_name}" in command + assert f"timeout {job.wallclock_in_seconds}" in command