Skip to content

Commit

Permalink
Add all expid_dir_path functions to autosubmit.py, add attached files…
Browse files Browse the repository at this point in the history
… to emails (#1997)

* Add all expid_dir_path functions to autosubmit.py and fix tests !509

All previous definitions all expid_path, tmp_path, log_path and
aslogs_path (and similar) have been removed and their calls have been
switched for BasicConfig.foo() calls. New tests added

* Add compression of log files and one test

* Default to last log file and two more test cases

* Add more tests

* Add suggestions

* Small corrections

* Add last suggestions

* Linting

---------

Co-authored-by: Irene Simo Munoz <[email protected]>
  • Loading branch information
kinow and isimo00 authored Jan 22, 2025
1 parent 81f4079 commit f7c8c83
Show file tree
Hide file tree
Showing 6 changed files with 441 additions and 67 deletions.
217 changes: 183 additions & 34 deletions autosubmit/notifications/mail_notifier.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,62 +17,211 @@
# You should have received a copy of the GNU General Public License
# along with Autosubmit. If not, see <http://www.gnu.org/licenses/>.

import smtplib
import email.utils
import smtplib
import zipfile
from email.mime.application import MIMEApplication
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from log.log import Log
from pathlib import Path
from tempfile import TemporaryDirectory
from textwrap import dedent
from typing import List, TYPE_CHECKING

from autosubmitconfigparser.config.basicconfig import BasicConfig
from log.log import Log, AutosubmitError

if TYPE_CHECKING:
from autosubmit.platforms.platform import Platform


def _compress_file(
temporary_directory: TemporaryDirectory,
file_path: Path) -> Path:
"""Compress a file.
The file is created inside the given temporary directory.
The function returns a ``Path`` of the archive file.
:param temporary_directory: The temporary directory.
:type temporary_directory: TemporaryDirectory
:param file_path: The path of the file to be compressed.
:type file_path: Path
:return: The Path object of the compressed file.
:rtype: str
:raises AutosubmitError: The file cannot be compressed.
"""
try:
zip_file_name = Path(temporary_directory.name, f'{file_path.name}.zip')
with zipfile.ZipFile(zip_file_name, 'w', zipfile.ZIP_DEFLATED) as zip_file:
zip_file.write(file_path, Path(file_path).name)
return Path(zip_file.filename)
except ValueError as e:
raise AutosubmitError(
code=6011,
message='An error has occurred while compressing log files for a warning email',
trace=str(e))


def _attach_file(file_path: Path, message: MIMEMultipart) -> None:
"""Attach a file to a message.
The attachment file name will be the same name of the ``file_path``.
:param file_path: The path of the file to be attached.
:type file_path: Path
:param message: The message for the file to be attached to.
:type message: MIMEMultipart
:raises AutosubmitError: The file cannot be attached.
"""
try:
compressed_file_name = file_path.name
part = MIMEApplication(
file_path.read_bytes(),
Name=compressed_file_name,
Content_disposition=f'attachment; filename="{compressed_file_name}'
)
message.attach(part)
except (TypeError, ValueError) as e:
raise AutosubmitError(
code=6011,
message='An error has occurred while attaching log files to a warning email about remote_platforms',
trace=str(e))


def _generate_message_text(
exp_id: str,
job_name: str,
prev_status: str,
status: str) -> str:
"""Generate email body to notify about status change.
:param exp_id: The experiment id.
:type exp_id: str
:param job_name: The name of the job.
:type job_name: str
:param prev_status: The previous status.
:type prev_status: str
:param status: The current status.
:type status: str
:return: The body of the email message.
"""
return dedent(f'''\
Autosubmit notification\n
-------------------------\n\n
Experiment id: {exp_id}\n\n
Job name: {job_name}\n\n
The job status has changed from: {prev_status} to {status}\n\n\n\n\n
INFO: This message was auto generated by Autosubmit,
remember that you can disable these messages on Autosubmit config file.\n''')


def _generate_message_experiment_status(
exp_id: str, platform: "Platform") -> str:
"""Generate email body for the experiment status notification.
:param exp_id: The experiment id.
:type exp_id: str
:param platform: The platform.
:type platform: Platform
:return: The email body for the experiment status notification.
:rtype: str
"""
return dedent(f'''\
Autosubmit notification: Remote Platform issues\n
-------------------------\n
Experiment id: {exp_id}
Logs and errors: {BasicConfig.expid_aslog_dir(exp_id)}
Attached to this message you will find the related _run.log files.\n
Platform affected: {platform.name} using as host: {platform.host}\n
[WARN] Autosubmit encountered an issue with a remote platform.
It will resume itself, whenever is possible
If this issue persists, you can change the host IP or put multiple hosts in the platform.yml file\n\n\n\n\n
INFO: This message was auto generated by Autosubmit,
remember that you can disable these messages on Autosubmit config file.\n''')


class MailNotifier:
def __init__(self, basic_config):
self.config = basic_config

def notify_experiment_status(self, exp_id,mail_to,platform):
message_text = self._generate_message_experiment_status(exp_id, platform)
message = MIMEText(message_text)
message['From'] = email.utils.formataddr(('Autosubmit', self.config.MAIL_FROM))
message['Subject'] = f'[Autosubmit] Warning a remote platform is malfunctioning'
def notify_experiment_status(
self,
exp_id: str,
mail_to: List[str],
platform: "Platform") -> None:
"""Send email notifications.
The latest run_log file (we consider latest the last one sorting
the listing by name).
:param exp_id: The experiment id.
:type exp_id: str
:param mail_to: The email address.
:type mail_to: List[str]
:param platform: The platform.
:type platform: Platform
"""
if not isinstance(mail_to, list):
raise ValueError('mail_to must be a list of emails!')
message_text = _generate_message_experiment_status(exp_id, platform)
message = MIMEMultipart()
message['From'] = email.utils.formataddr(
('Autosubmit', self.config.MAIL_FROM))
message['Subject'] = '[Autosubmit] Warning: a remote platform is malfunctioning'
message['Date'] = email.utils.formatdate(localtime=True)
message.attach(MIMEText(message_text))

run_log_files = [f for f in self.config.expid_aslog_dir(
exp_id).glob('*_run.log') if Path(f).is_file()]
if run_log_files:
latest_run_log: Path = max(run_log_files)
temp_dir = TemporaryDirectory()
try:
compressed_run_log = _compress_file(temp_dir, latest_run_log)
_attach_file(compressed_run_log, message)
except AutosubmitError as e:
Log.printlog(code=e.code, message=e.message)
finally:
if temp_dir:
temp_dir.cleanup()

for mail in mail_to:
message['To'] = email.utils.formataddr((mail, mail))
try:
self._send_mail(self.config.MAIL_FROM, mail, message)
except BaseException as e:
Log.printlog('An error has occurred while sending a mail for warn about remote_platform', 6011)
Log.printlog(
f'Trace:{str(e)}\nAn error has occurred while sending a mail for '
f'warn about remote_platform', 6011)

def notify_status_change(self, exp_id, job_name, prev_status, status, mail_to):
message_text = self._generate_message_text(exp_id, job_name, prev_status, status)
def notify_status_change(
self,
exp_id: str,
job_name: str,
prev_status: str,
status: str,
mail_to: List[str]) -> None:
if not isinstance(mail_to, list):
raise ValueError('mail_to must be a list of emails!')
message_text = _generate_message_text(
exp_id, job_name, prev_status, status)
message = MIMEText(message_text)
message['From'] = email.utils.formataddr(('Autosubmit', self.config.MAIL_FROM))
message['Subject'] = f'[Autosubmit] The job {job_name} status has changed to {str(status)}'
message['From'] = email.utils.formataddr(
('Autosubmit', self.config.MAIL_FROM))
message['Subject'] = f'[Autosubmit] The job {job_name} status has changed to {status}'
message['Date'] = email.utils.formatdate(localtime=True)
for mail in mail_to: # expects a list
message['To'] = email.utils.formataddr((mail, mail))
try:
self._send_mail(self.config.MAIL_FROM, mail, message)
except BaseException as e:
Log.printlog('Trace:{0}\nAn error has occurred while sending a mail for the job {0}'.format(e,job_name), 6011)
Log.printlog(
f'Trace:{str(e)}\nAn error has occurred while sending a mail '
f'for the job {job_name}', 6011)

def _send_mail(self, mail_from, mail_to, message):
server = smtplib.SMTP(self.config.SMTP_SERVER,timeout=60)
server = smtplib.SMTP(self.config.SMTP_SERVER, timeout=60)
server.sendmail(mail_from, mail_to, message.as_string())
server.quit()

@staticmethod
def _generate_message_text(exp_id, job_name, prev_status, status):
return f'Autosubmit notification\n' \
f'-------------------------\n\n' \
f'Experiment id: {str(exp_id)} \n\n' \
+ f'Job name: {str(job_name)} \n\n' \
f'The job status has changed from: {str(prev_status)} to {str(status)} \n\n\n\n\n' \
f'INFO: This message was auto generated by Autosubmit, '\
f'remember that you can disable these messages on Autosubmit config file. \n'

@staticmethod
def _generate_message_experiment_status(exp_id, platform=""):
return f'Autosubmit notification: Remote Platform issues\n' \
f'-------------------------\n\n' \
f'Experiment id:{str(exp_id)} \n\n' \
+ f'Platform affected:{str(platform.name)} using as host:{str(platform.host)} \n\n' \
f'[WARN] Autosubmit encountered an issue with an remote_platform.\n It will resume itself, whenever is possible\n If issue persist, you can change the host IP or put multiple hosts in the platform.yml' + '\n\n\n\n\n' \
f'INFO: This message was auto generated by Autosubmit, '\
f'remember that you can disable these messages on Autosubmit config file. \n'
27 changes: 15 additions & 12 deletions test/unit/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from shutil import rmtree
from tempfile import TemporaryDirectory
from typing import Any, Dict, Callable, List, Protocol, Optional
import os

from autosubmit.autosubmit import Autosubmit
from autosubmit.platforms.slurmplatform import SlurmPlatform, ParamikoPlatform
Expand All @@ -27,7 +28,6 @@ class AutosubmitExperiment:
status_dir: Path
platform: ParamikoPlatform


@pytest.fixture(scope='function')
def autosubmit_exp(autosubmit: Autosubmit, request: pytest.FixtureRequest) -> Callable:
"""Create an instance of ``Autosubmit`` with an experiment."""
Expand All @@ -36,17 +36,21 @@ def autosubmit_exp(autosubmit: Autosubmit, request: pytest.FixtureRequest) -> Ca
tmp_dir = TemporaryDirectory()
tmp_path = Path(tmp_dir.name)


def _create_autosubmit_exp(expid: str):
# directories used when searching for logs to cat
root_dir = tmp_path
BasicConfig.LOCAL_ROOT_DIR = str(root_dir)
exp_path = root_dir / expid
exp_tmp_dir = exp_path / BasicConfig.LOCAL_TMP_DIR
aslogs_dir = exp_tmp_dir / BasicConfig.LOCAL_ASLOG_DIR
status_dir = exp_path / 'status'
aslogs_dir.mkdir(parents=True, exist_ok=True)
status_dir.mkdir(parents=True, exist_ok=True)

exp_path = BasicConfig.expid_dir(expid)

# directories used when searching for logs to cat
exp_tmp_dir = BasicConfig.expid_tmp_dir(expid)
aslogs_dir = BasicConfig.expid_aslog_dir(expid)
status_dir =exp_path / 'status'
if not os.path.exists(aslogs_dir):
os.makedirs(aslogs_dir)
if not os.path.exists(status_dir):
os.makedirs(status_dir)

platform_config = {
"LOCAL_ROOT_DIR": BasicConfig.LOCAL_ROOT_DIR,
"LOCAL_TMP_DIR": str(exp_tmp_dir),
Expand All @@ -59,7 +63,7 @@ def _create_autosubmit_exp(expid: str):
'QUEUING': [],
'FAILED': []
}
submit_platform_script = aslogs_dir / 'submit_local.sh'
submit_platform_script = aslogs_dir.joinpath('submit_local.sh')
submit_platform_script.touch(exist_ok=True)

return AutosubmitExperiment(
Expand Down Expand Up @@ -94,7 +98,7 @@ def autosubmit() -> Autosubmit:
@pytest.fixture(scope='function')
def create_as_conf() -> Callable: # May need to be changed to use the autosubmit_config one
def _create_as_conf(autosubmit_exp: AutosubmitExperiment, yaml_files: List[Path], experiment_data: Dict[str, Any]):
conf_dir = autosubmit_exp.exp_path / 'conf'
conf_dir = autosubmit_exp.exp_path.joinpath('conf')
conf_dir.mkdir(parents=False, exist_ok=False)
basic_config = BasicConfig
parser_factory = YAMLParserFactory()
Expand All @@ -117,7 +121,6 @@ def _create_as_conf(autosubmit_exp: AutosubmitExperiment, yaml_files: List[Path]

return _create_as_conf


class AutosubmitConfigFactory(Protocol): # Copied from the autosubmit config parser, that I believe is a revised one from the create_as_conf

def __call__(self, expid: str, experiment_data: Optional[Dict], *args: Any, **kwargs: Any) -> AutosubmitConfig: ...
Expand Down
Loading

0 comments on commit f7c8c83

Please sign in to comment.