diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml new file mode 100644 index 000000000..d3ee6a7ff --- /dev/null +++ b/.pre-commit-config.yaml @@ -0,0 +1,12 @@ +# See https://pre-commit.com for more information +# See https://pre-commit.com/hooks.html for more hooks +repos: +- repo: https://github.com/pre-commit/pre-commit-hooks + rev: v2.5.0 + hooks: + - id: trailing-whitespace + - id: end-of-file-fixer + - id: check-yaml + - id: check-added-large-files + - id: check-toml + - id: flake8 diff --git a/.travis.yml b/.travis.yml index cf6768082..ee3bdb93a 100644 --- a/.travis.yml +++ b/.travis.yml @@ -2,7 +2,6 @@ language: python python: - "2.7" - - "3.4" - "3.5" - "3.6" diff --git a/MANIFEST.in b/MANIFEST.in new file mode 100644 index 000000000..4cdd0c857 --- /dev/null +++ b/MANIFEST.in @@ -0,0 +1 @@ +include maestrowf/datastructures/schemas.json diff --git a/Pipfile b/Pipfile index 6de850fe1..1c689656a 100644 --- a/Pipfile +++ b/Pipfile @@ -4,23 +4,28 @@ verify_ssl = true name = "pypi" [packages] -"enum34" = "*" -filelock = "*" +PyYAML = ">=4.2b1" six = "*" +filelock = "*" tabulate = "*" -Fabric = "*" -PyYAML = ">= 4.2b1" +dill = "*" maestrowf = {path = "."} [dev-packages] -"flake8" = "*" +flake8 = "*" pydocstyle = "*" pylint = "*" tox = "*" coverage = "*" -sphinx_rtd_theme = "*" -Sphinx = "*" pytest = "*" +fabric = "*" +Sphinx = "*" +pytest-cov = "*" +pre-commit = "*" +sphinx-rtd-theme = "*" + +[pipenv] +allow_prereleases = true [requires] python_version = "3.6" diff --git a/README.md b/README.md index 7688ba4d1..6a3d377d0 100644 --- a/README.md +++ b/README.md @@ -1,21 +1,133 @@ -# Maestro Workflow Conductor (MaestroWF) +![](https://github.com/LLNL/maestrowf/raw/develop/assets/logo.png?raw=true "Orchestrate your workflows with ease!") + +# Maestro Workflow Conductor ([maestrowf](https://pypi.org/project/maestrowf/)) + [![Build Status](https://travis-ci.org/LLNL/maestrowf.svg?branch=develop)](https://travis-ci.org/LLNL/maestrowf) [![PyPI](https://img.shields.io/pypi/v/maestrowf.svg)](https://pypi.python.org/pypi?name=maestrowf&version=1.0.0&:action=display) +![Spack](https://img.shields.io/spack/v/py-maestrowf) [![Issues](https://img.shields.io/github/issues/LLNL/maestrowf.svg)](https://github.com/LLNL/maestrowf/issues) [![Forks](https://img.shields.io/github/forks/LLNL/maestrowf.svg)](https://github.com/LLNL/maestrowf/network) [![Stars](https://img.shields.io/github/stars/LLNL/maestrowf.svg)](https://github.com/LLNL/maestrowf/stargazers) [![License](https://img.shields.io/badge/license-MIT-blue.svg)](https://raw.githubusercontent.com/LLNL/maestrowf/master/LICENSE) -## Introduction +[![Downloads](https://pepy.tech/badge/maestrowf)](https://pepy.tech/project/maestrowf) +[![Downloads](https://pepy.tech/badge/maestrowf/month)](https://pepy.tech/project/maestrowf/month) +[![Downloads](https://pepy.tech/badge/maestrowf/week)](https://pepy.tech/project/maestrowf/week) -Maestro Workflow Conductor is a Python tool and library for specifying and automating multi-step computational workflows both locally and on supercomputers. Maestro parses a human-readable YAML specification that is self-documenting and portable from one user and environment to another. +Maestro can be installed via [pip](https://pip.pypa.io/): -On the backend, Maestro implements a set of standard interfaces and data structures for handling "study" construction. These objects offer you the ability to use Maestro as a library, and construct your own workflows that suit your own custom needs. We also offer other structures that make portable execution on various schedulers much easier than porting scripts by hand. + pip install maestrowf -### Core Concepts +## Documentation -There are many definitions of workflow, so we try to keep it simple and define the term as follows: +* [Maestro Documentation](https://maestrowf.readthedocs.io) +* [Maestro Samples](/samples) + +## Getting Started is Quick and Easy + +Create a `YAML` file named `study.yaml` and paste the following content into the file: + +``` yaml +description: + name: hello_world + description: A simple 'Hello World' study. + +study: + - name: say-hello + description: Say hello to the world! + run: + cmd: | + echo "Hello, World!" > hello_world.txt ``` + +> *PHILOSOPHY*: Maestro believes in the principle of a clearly defined process, specified as a list of tasks, that are self-documenting and clear in their intent. + +Running the `hello_world` study is as simple as... + + maestro run study.yaml + +## Creating a Parameter Study is just as Easy + +With the addition of the `global.parameters` block, and a few simple tweaks to your `study` block, the complete specification should look like this: + +``` yaml +description: + name: hello_planet + description: A simple study to say hello to planets (and Pluto) + +study: + - name: say-hello + description: Say hello to a planet! + run: + cmd: | + echo "Hello, $(PLANET)!" > hello_$(PLANET).txt + +global.parameters: + PLANET: + values: [Mercury, Venus, Earth, Mars, Jupiter, Saturn, Uranus, Neptune, Pluto] + label: PLANET.%% +``` + +> *PHILOSOPHY*: Maestro believes that a workflow should be easily parameterized with minimal modifications to the core process. + +Maestro will automatically expand each parameter into its own isolated workspace, generate a script for each parameter, and automatically monitor execution of each task. + +And, running the study is still as simple as: + +``` bash + maestro run study.yaml +``` + +## Scheduling Made Simple + +But wait there's more! If you want to schedule a study, it's just as simple. With some minor modifications, you are able to run on an [HPC](https://en.wikipedia.org/wiki/Supercomputer) system. + +``` yaml +description: + name: hello_planet + description: A simple study to say hello to planets (and Pluto) + +batch: + type: slurm + queue: pbatch + host: quartz + bank: science + +study: + - name: say-hello + description: Say hello to a planet! + run: + cmd: | + echo "Hello, $(PLANET)!" > hello_$(PLANET).txt + nodes: 1 + procs: 1 + walltime: "00:02:00" + +global.parameters: + PLANET: + values: [Mercury, Venus, Earth, Mars, Jupiter, Saturn, Uranus, Neptune, Pluto] + label: PLANET.%% +``` + +> **NOTE**: This specification is configured to run on LLNL's quartz cluster. Under the `batch` header, you will need to make the necessary changes to schedule onto other HPC resources. +> +> *PHILOSOPHY*: Maestro believes that how a workflow is defined should be decoupled from how it's run. We achieve this capability by providing a seamless interface to multiple schedulers that allows Maestro to readily port workflows to multiple platforms. + +For other samples, see the [samples](/samples) subfolder. To continue with our Hello World example, see the [Basics of Study Construction](https://maestrowf.readthedocs.io/en/latest/hello_world.html) in our [documentation](https://maestrowf.readthedocs.io/en/latest/index.html). + +## An Example Study using LULESH + +Maestro comes packed with a basic example using [LULESH](https://github.com/LLNL/LULESH), a proxy application provided by LLNL. You can find the example [here](https://maestrowf.readthedocs.io/en/latest/quick_start.html#). + +## What is Maestro? + +Maestro is an open-source HPC software tool that defines a YAML-based study specification for defining multistep workflows and automates execution of software flows on HPC resources. The core design tenants of Maestro focus on encouraging clear workflow communication and documentation, while making consistent execution easier to allow users to focus on science. Maestro’s study specification helps users think about complex workflows in a step-wise, intent-oriented, manner that encourages modularity and tool reuse. These principles are becoming increasingly important as computational science is continuously more present in scientific fields and has started to require a similar rigor to physical experiment. Maestro is currently in use for multiple projects at Lawrence Livermore National Laboratory and has been used to run existing codes including MFEM, and other simulation codes. It has also been used in other areas including in the training of machine-learned models and more. + +### Maestro's Foundation and Core Concepts + +There are many definitions of workflow, so we try to keep it simple and define the term as follows: + +``` text A set of high level tasks to be executed in some order, with or without dependencies on each other. ``` @@ -24,56 +136,43 @@ We have designed Maestro around the core concept of what we call a "study". A st Maestro's core tenets are defined as follows: ##### Repeatability + A study should be easily repeatable. Like any well-planned and implemented science experiment, the steps themselves should be executed the exact same way each time a study is run over each set of parameters or over different runs of the study itself. ##### Consistent + Studies should be consistently documented and able to be run in a consistent fashion. The removal of variation in the process means less mistakes when executing studies, ease of picking up studies created by others, and uniformity in defining new studies. ##### Self-documenting + Documentation is important in computational studies as much as it is in physical science. The YAML specification defined by Maestro provides a few required key encouraging human-readable documentation. Even further, the specification itself is a documentation of a complete workflow. ---------------- -## Getting Started +## Setting up your Python Environment To get started, we recommend using virtual environments. If you do not have the -Python virtual environment package and wrapper installed follow this [guide](http://python-guide-pt-br.readthedocs.io/en/latest/dev/virtualenvs/). - -### Environment Setup -If you do not have or use virtualenvwrapper: +Python `virtualenv` package installed, take a look at their official [documentation](https://packaging.python.org/guides/installing-using-pip-and-virtual-environments/) to get started. - $ python -m virtualenv venv - $ source venv/bin/activate -Otherwise: +To create a new virtual environment: - $ mkvirtualenv venv + python -m virtualenv maestro_venv + source maestro_venv/bin/activate +### Getting Started for Contributors -Once set up, test the environment. The paths should point to a virtual environment folder. - - $ which python - $ which pip - -### Installation - -For general installation, you can install MaestroWF using the following: - - $ pip install maestrowf - -If you plan to develop on MaestroWF, install the repository directly using: +If you plan to develop on Maestro, install the repository directly using: - $ pip install -r requirements.txt - $ pip install -e . + pip install -r requirements.txt + pip install -e . ----------------- - -### Quickstart Example - -MaestroWF comes packed with a basic example using LULESH, a proxy application provided by LLNL. You can find the Quick Start guide [here](https://maestrowf.readthedocs.io/en/latest/quick_start.html#). +Once set up, test the environment. The paths should point to a virtual environment folder. ----------------- + which python + which pip ## Contributors + Many thanks go to MaestroWF's [contributors](https://github.com/LLNL/maestrowf/graphs/contributors). If you have any questions or to submit feature requests please [open a ticket](https://github.com/llnl/maestrowf/issues). diff --git a/assets/logo.png b/assets/logo.png new file mode 100755 index 000000000..b8ebee489 Binary files /dev/null and b/assets/logo.png differ diff --git a/docs/README.md b/docs/README.md index b5dbf4a7a..bad18a4b5 100644 --- a/docs/README.md +++ b/docs/README.md @@ -1,6 +1,6 @@ # Maestro Workflow Conductor Documentation -[maestrowf.rtfd.io](http://maestrowf.readthedocs.io/en/latest/) +[maestrowf.rtd.io](http://maestrowf.readthedocs.io/en/latest/) This documentation is built with Sphinx for ReadTheDocs. The contents are automatically generated from the doc strings found in the code. diff --git a/docs/source/conf.py b/docs/source/conf.py index 570d5e0e6..6a8382758 100644 --- a/docs/source/conf.py +++ b/docs/source/conf.py @@ -56,9 +56,9 @@ # built documents. # # The short X.Y version. -version = u'0.0.1dev0' +version = u'1.1' # The full version, including alpha/beta/rc tags. -release = u'0.0.1dev0' +release = u'1.1.7dev1' # The language for content autogenerated by Sphinx. Refer to documentation # for a list of supported languages. diff --git a/maestrowf/__init__.py b/maestrowf/__init__.py index 7d2d89f53..9a53f8681 100644 --- a/maestrowf/__init__.py +++ b/maestrowf/__init__.py @@ -51,5 +51,5 @@ def emit(self, record): LOGGER = logging.getLogger(__name__) LOGGER.addHandler(NullHandler()) -__version_info__ = ("1", "1", "6") +__version_info__ = ("1", "1", "7") __version__ = '.'.join(__version_info__) diff --git a/maestrowf/abstracts/__init__.py b/maestrowf/abstracts/__init__.py index 5cacc25c6..0f6d0c861 100644 --- a/maestrowf/abstracts/__init__.py +++ b/maestrowf/abstracts/__init__.py @@ -33,23 +33,56 @@ This module contains all of the abstract classes and APIs for defining objects. Abstracts include abstract data stuctures (like a graph), APIs for concepts such as queueing adapters and environment APIs, as well as fundamental data -structures like a SimObject. +structures. """ # NOTE: Some of these abstracts will be moved in the future. The Graph abstract # class does not belong here, and should be moved to something more general. -# NOTE: The SimObject base class may not be required, since it basically -# just requires objects to be dictionaries. +import dill +import logging from maestrowf.abstracts.abstractclassmethod import abstractclassmethod from maestrowf.abstracts.envobject import Dependency, Source, Substitution from maestrowf.abstracts.graph import Graph -from maestrowf.abstracts.simobject import SimObject from maestrowf.abstracts.specification import Specification -__all__ = ("abstractclassmethod", "Dependency", "Graph", "SimObject", +__all__ = ("abstractclassmethod", "Dependency", "Graph", "PickleInterface", "Singleton", "Source", "Specification", "Substitution") +LOGGER = logging.getLogger(__name__) + + +class PickleInterface: + """A mixin class that implements a general pickle interface using dill.""" + + @classmethod + def unpickle(cls, path): + """ + Load a pickled instance from a pickle file. + + :param path: Path to a pickle file containing a class instance. + """ + with open(path, 'rb') as pkl: + obj = dill.load(pkl) + + if not isinstance(obj, cls): + msg = "Object loaded from {path} is of type {type}. Expected an" \ + " object of type '{cls}.'".format(path=path, type=type(obj), + cls=type(cls)) + LOGGER.error(msg) + raise TypeError(msg) + + return obj + + def pickle(self, path): + """ + Generate a pickle file of of a class instance. + + :param path: The path to write the pickle to. + """ + with open(path, 'wb') as pkl: + dill.dump(self, pkl) + class _Singleton(type): _instances = {} diff --git a/maestrowf/abstracts/enums/__init__.py b/maestrowf/abstracts/enums/__init__.py index 58bb44d86..fffb1a13e 100644 --- a/maestrowf/abstracts/enums/__init__.py +++ b/maestrowf/abstracts/enums/__init__.py @@ -65,6 +65,7 @@ class State(Enum): TIMEDOUT = 10 UNKNOWN = 11 CANCELLED = 12 + DRYRUN = 13 class StudyStatus(Enum): diff --git a/maestrowf/abstracts/envobject.py b/maestrowf/abstracts/envobject.py index d3d4f083a..32e1685c6 100644 --- a/maestrowf/abstracts/envobject.py +++ b/maestrowf/abstracts/envobject.py @@ -33,13 +33,11 @@ import logging import six -from maestrowf.abstracts.simobject import SimObject - -logger = logging.getLogger(__name__) +LOGGER = logging.getLogger(__name__) @six.add_metaclass(ABCMeta) -class EnvObject(SimObject): +class EnvObject: """ An abstract class representing objects that exist in a study's environment. @@ -64,7 +62,6 @@ def _verify(self): :returns: True if the EnvObject is verified, False otherwise. """ - pass def _verification(self, error): """ @@ -73,7 +70,7 @@ def _verification(self, error): :param error: String containing a custom error message. """ if not self._verify(): - logger.exception(error) + LOGGER.exception(error) raise ValueError(error) @@ -94,10 +91,8 @@ def substitute(self, data): :returns: A string equal to the original string data with substitutions made (if any were performed). """ - pass -@six.add_metaclass(ABCMeta) class Source(EnvObject): """ Abstract class representing classes that alter environment sourcing. @@ -126,7 +121,6 @@ def apply(self, data): # NOTE: This functionality has not been settled yet. The use of this # class or this design may not be the best for applying script sources # to an environment. - pass @six.add_metaclass(ABCMeta) @@ -158,4 +152,3 @@ def acquire(self, substitutions=None): :param substitutions: List of Substitution objects that can be applied. """ - pass diff --git a/maestrowf/abstracts/graph.py b/maestrowf/abstracts/graph.py index 656d644b5..c8e0dd83c 100644 --- a/maestrowf/abstracts/graph.py +++ b/maestrowf/abstracts/graph.py @@ -33,7 +33,7 @@ @six.add_metaclass(ABCMeta) -class Graph(object): +class Graph: """An abstract graph data structure.""" # NOTE: fdinatal -- 04/07/2017 @@ -50,7 +50,6 @@ def add_node(self, name, obj): :param name: String identifier of the node. :param obj: An object representing the value of the node. """ - pass @abstractmethod def add_edge(self, src, dest): @@ -60,7 +59,6 @@ def add_edge(self, src, dest): :param src: Source vertex name. :param dest: Destination vertex name. """ - pass @abstractmethod def remove_edge(self, src, dest): @@ -70,4 +68,3 @@ def remove_edge(self, src, dest): :param src: Source vertex name. :param dest: Destination vertex name. """ - pass diff --git a/maestrowf/abstracts/simobject.py b/maestrowf/abstracts/simobject.py deleted file mode 100644 index 652b4917e..000000000 --- a/maestrowf/abstracts/simobject.py +++ /dev/null @@ -1,64 +0,0 @@ -############################################################################### -# Copyright (c) 2017, Lawrence Livermore National Security, LLC. -# Produced at the Lawrence Livermore National Laboratory -# Written by Francesco Di Natale, dinatale3@llnl.gov. -# -# LLNL-CODE-734340 -# All rights reserved. -# This file is part of MaestroWF, Version: 1.0.0. -# -# For details, see https://github.com/LLNL/maestrowf. -# -# Permission is hereby granted, free of charge, to any person obtaining a copy -# of this software and associated documentation files (the "Software"), to deal -# in the Software without restriction, including without limitation the rights -# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -# copies of the Software, and to permit persons to whom the Software is -# furnished to do so, subject to the following conditions: -# The above copyright notice and this permission notice shall be included in -# all copies or substantial portions of the Software. -# -# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE -# SOFTWARE. -############################################################################### - -"""The core building block class for study related classes.""" - - -class SimObject(object): - """ - A base class for objects that provides a basic API. - - The SimObject is an object meant to capture the very basic functionality - that core classes and other closesly related classes should adhere to. - The eventual goal of having this base class is to allow a study to be - designed and written in Python code, which allows for those objects to - return a basic dictionary form which could be used to map from one study - specification, format, or otherwise to another. This basic functionality - also allows for a study to be easier to write using standard formats such - as YAML or JSON in order to keep record of how the studies were performed. - """ - - @classmethod - def from_dict(cls, dictionary): - """ - Method for populating a SimObject from a dictionary. - - :param cls: Class to be instantiated. - :param dict: Dictionary containing attribute data. - :return: Instance of cls. - """ - instance = cls() - for key, value in dictionary.items(): - instance.__dict__[key] = value - - return instance - - def to_dict(self): - """Return a dictionary version of the SimObject.""" - return self.__dict__ diff --git a/maestrowf/abstracts/specification.py b/maestrowf/abstracts/specification.py index a5b6ae43a..c6c9dbb98 100644 --- a/maestrowf/abstracts/specification.py +++ b/maestrowf/abstracts/specification.py @@ -30,11 +30,11 @@ from abc import ABCMeta, abstractmethod, abstractproperty import six -from maestrowf.abstracts import abstractclassmethod +from . import abstractclassmethod @six.add_metaclass(ABCMeta) -class Specification(object): +class Specification: """ Abstract class for loading and verifying a Study Specification """ @@ -48,7 +48,6 @@ def load_specification(cls, path): :returns: A specification object containing the information loaded from path. """ - pass @abstractclassmethod def load_specification_from_stream(cls, stream): @@ -58,14 +57,12 @@ def load_specification_from_stream(cls, stream): :param stream: Raw text stream containing specification data. :returns: A specification object containing the information in string. """ - pass @abstractmethod def verify(self): """ Verify the whole specification. """ - pass @abstractmethod def get_study_environment(self): @@ -74,7 +71,6 @@ def get_study_environment(self): :returns: A StudyEnvironment object with the data in the specification. """ - pass @abstractmethod def get_parameters(self): @@ -83,7 +79,6 @@ def get_parameters(self): :returns: A ParameterGenerator with data from the specification. """ - pass @abstractmethod def get_study_steps(self): @@ -92,7 +87,6 @@ def get_study_steps(self): :returns: A list of StudyStep objects. """ - pass @abstractproperty def output_path(self): @@ -101,7 +95,6 @@ def output_path(self): :returns: Returns OUTPUT_PATH if it exists, empty string otherwise. """ - pass @abstractproperty def name(self): @@ -110,7 +103,6 @@ def name(self): :returns: The name of the study described by the specification. """ - pass @name.setter def name(self, value): @@ -119,7 +111,6 @@ def name(self, value): :param value: String value representing the new name. """ - pass @abstractproperty def desc(self): @@ -129,7 +120,6 @@ def desc(self): :returns: A string containing the description of the study specification. """ - pass @desc.setter def desc(self, value): @@ -138,4 +128,3 @@ def desc(self, value): :param value: String value representing the new description. """ - pass diff --git a/maestrowf/conductor.py b/maestrowf/conductor.py index 26aebac93..901d00270 100644 --- a/maestrowf/conductor.py +++ b/maestrowf/conductor.py @@ -36,23 +36,73 @@ import os import sys from time import sleep +import dill +import yaml from maestrowf.abstracts.enums import StudyStatus -from maestrowf.datastructures.core import ExecutionGraph -from maestrowf.utils import create_parentdir +from maestrowf.datastructures.core import Study +from maestrowf.utils import create_parentdir, csvtable_to_dict, make_safe_path # Logger instantiation -rootlogger = logging.getLogger(inspect.getmodule(__name__)) -logger = logging.getLogger(__name__) +ROOTLOGGER = logging.getLogger(inspect.getmodule(__name__)) +LOGGER = logging.getLogger(__name__) # Formatting of logger. -LFORMAT = "%(asctime)s - %(name)s:%(funcName)s:%(lineno)s - " \ - "%(levelname)s - %(message)s" +LFORMAT = "[%(asctime)s: %(levelname)s] %(message)s" -def setup_argparser(): - """Set up the program's argument parser.""" - parser = ArgumentParser(prog="ExecutionManager", +def setup_logging(name, output_path, log_lvl=2, log_path=None, + log_stdout=False, log_format=None): + """ + Set up logging in the Main class. + :param args: A Namespace object created by a parsed ArgumentParser. + :param name: The name of the log file. + """ + # Check if the user has specified a custom log path. + if log_path: + LOGGER.info( + "Log path overwritten by command line -- %s", log_path) + else: + log_path = os.path.join(output_path, "logs") + + if not log_format: + log_format = LFORMAT + + loglevel = log_lvl * 10 + + # Attempt to create the logging directory. + create_parentdir(log_path) + formatter = logging.Formatter(LFORMAT) + ROOTLOGGER.setLevel(loglevel) + + # Set up handlers + if log_stdout: + handler = logging.StreamHandler() + handler.setFormatter(formatter) + ROOTLOGGER.addHandler(handler) + + log_file = os.path.join(log_path, "{}.log".format(name)) + handler = logging.FileHandler(log_file) + handler.setFormatter(formatter) + ROOTLOGGER.addHandler(handler) + ROOTLOGGER.setLevel(loglevel) + + # Print the level of logging. + LOGGER.info("INFO Logging Level -- Enabled") + LOGGER.warning("WARNING Logging Level -- Enabled") + LOGGER.critical("CRITICAL Logging Level -- Enabled") + LOGGER.debug("DEBUG Logging Level -- Enabled") + + +def setup_parser(): + """ + Set up the Conductors's argument parser. + + :returns: A ArgumentParser that's initialized with the conductor's CLI. + """ + + # Set up the parser for our conductor here. + parser = ArgumentParser(prog="Conductor", description="An application for checking and " "managing an ExecutionDAG within an executing" "study.", @@ -75,137 +125,272 @@ def setup_argparser(): "2 - Info (Default)\n" "1 - Debug") parser.add_argument("-c", "--logstdout", action="store_true", - help="Output logging to stdout in addition to a file.") + help="Output logging to stdout in addition to a " + "file.") parser.add_argument("-t", "--sleeptime", type=int, default=60, - help="Amount of time (in seconds) for the manager to " - "wait between job status checks.") + help="Amount of time (in seconds) for the manager" + " to wait between job status checks.") return parser -def setup_logging(args, name): - """ - Set up logging in the Main class. +class Conductor: + """A class that provides an API for interacting with the Conductor.""" - :param args: A Namespace object created by a parsed ArgumentParser. - :param name: The name of the log file. - """ - # Check if the user has specified a custom log path. - if args.logpath: - logger.info("Log path overwritten by command line -- %s", - args.logpath) - log_path = args.logpath - else: - log_path = os.path.join(args.directory, "logs") + _pkl_extension = ".study.pkl" + _cancel_lock = ".cancel.lock" + _batch_info = "batch.info" - loglevel = args.debug_lvl * 10 + def __init__(self, study): + """ + Create a new instance of a Conductor class. - # Attempt to create the logging directory. - create_parentdir(log_path) - formatter = logging.Formatter(LFORMAT) - rootlogger.setLevel(loglevel) + :param study: An instance of a populated Maestro study. + """ + self._study = study + self._setup = False - # Set up handlers - if args.logstdout: - handler = logging.StreamHandler() - handler.setFormatter(formatter) - rootlogger.addHandler(handler) + @property + def output_path(self): + """ + Return the path representing the root of the study workspace. - log_file = os.path.join(log_path, "{}.log".format(name)) - handler = logging.FileHandler(log_file) - handler.setFormatter(formatter) - rootlogger.addHandler(handler) - rootlogger.setLevel(loglevel) + :returns: A string containing the path to the study's root. + """ + return self._study.output_path - # Print the level of logging. - logger.info("INFO Logging Level -- Enabled") - logger.warning("WARNING Logging Level -- Enabled") - logger.critical("CRITICAL Logging Level -- Enabled") - logger.debug("DEBUG Logging Level -- Enabled") - - -def monitor_study(dag, pickle_path, cancel_lock_path, sleep_time): - """Monitor a running study.""" - logger.debug("\n -------- Calling monitor study -------\n" - "pkl path = %s" - "cancel path = %s" - "sleep time = %s", - pickle_path, cancel_lock_path, sleep_time) - - completion_status = StudyStatus.RUNNING - while completion_status == StudyStatus.RUNNING: - if os.path.exists(cancel_lock_path): - # cancel the study if a cancel lock file is found - cancel_lock = FileLock(cancel_lock_path) + @property + def study_name(self): + """ + Return the name of the study this Conductor instance is managing. + + :returns: A string containing the name of the study. + """ + return self._study.name + + @classmethod + def store_study(cls, study): + """ + Store a Maestro study instance in a way the Conductor can read it. + """ + # Pickle up the Study + pkl_name = "{}{}".format(study.name, cls._pkl_extension) + pkl_path = make_safe_path(study.output_path, pkl_name) + study.pickle(pkl_path) + + @classmethod + def load_batch(cls, out_path): + """ + Load the batch information for the study rooted in 'out_path'. + + :param out_path: A string containing the path to a study root. + :returns: A dict containing the batch information for the study. + """ + batch_path = os.path.join(out_path, cls._batch_info) + + if not os.path.exists(batch_path): + msg = "Batch info files is missing. Please re-run Maestro." + LOGGER.error(msg) + raise Exception(msg) + + with open(batch_path, 'r') as data: try: - with cancel_lock.acquire(timeout=10): - # we have the lock - dag.cancel_study() - os.remove(cancel_lock_path) - logger.info("Study '%s' has been cancelled.", dag.name) - except Timeout: - logger.error("Failed to acquire cancellation lock.") - pass + batch_info = yaml.load(data, yaml.FullLoader) + except AttributeError: + LOGGER.warning( + "*** PyYAML is using an unsafe version with a known " + "load vulnerability. Please upgrade your installation " + "to a more recent version! ***") + batch_info = yaml.load(data) - logger.info("Checking DAG status at %s", str(datetime.now())) - # Execute steps that are ready - # Recieves StudyStatus enum - completion_status = dag.execute_ready_steps() - # Re-pickle the ExecutionGraph. - dag.pickle(pickle_path) - # Write out the state - dag.write_status(os.path.split(pickle_path)[0]) - # Sleep for SLEEPTIME in args if study not complete. - if completion_status == StudyStatus.RUNNING: - sleep(sleep_time) + return batch_info - return completion_status + @classmethod + def store_batch(cls, out_path, batch): + """ + Store the specified batch information to the study in 'out_path'. + :param out_path: A string containing the patht to a study root. + """ + path = os.path.join(out_path, cls._batch_info) + with open(path, "wb") as batch_info: + batch_info.write(yaml.dump(batch).encode("utf-8")) -def main(): - """Run the main segment of the conductor.""" - try: - # Set up and parse the ArgumentParser - parser = setup_argparser() - args = parser.parse_args() + @classmethod + def load_study(cls, out_path): + """ + Load the Study instance in the study root specified by 'out_path'. - # Unpickle the ExecutionGraph - study_pkl = glob.glob(os.path.join(args.directory, "*.pkl")) - # We expect only a single pickle file. - if len(study_pkl) == 1: - dag = ExecutionGraph.unpickle(study_pkl[0]) + :param out_path: A string containing the patht to a study root. + :returns: A string containing the path to the study's root. + """ + study_glob = \ + glob.glob(os.path.join(out_path, "*{}".format(cls._pkl_extension))) + + print(study_glob) + if len(study_glob) == 1: + # We only expect one result.If we only get one, let's assume and + # check after. + path = study_glob[0] + + with open(path, 'rb') as pkl: + obj = dill.load(pkl) + + if not isinstance(obj, Study): + msg = \ + "Object loaded from {path} is of type {type}. Expected " \ + "an object of type '{cls}.'" \ + .format(path=path, type=type(obj), cls=type(Study)) + LOGGER.error(msg) + raise TypeError(msg) else: - if len(study_pkl) > 1: + if len(study_glob) > 1: msg = "More than one pickle found. Expected one. Aborting." - status = 2 else: msg = "No pickle found. Aborting." - status = 1 - sys.stderr.write(msg) - sys.exit(status) + msg = "Corrupted study directory found. {}".format(msg) + raise Exception(msg) + + # Return the Study object + return obj + + @classmethod + def get_status(cls, output_path): + """ + Retrieve the status of the study rooted at 'out_path'. - # Set up logging - setup_logging(args, dag.name) - # Use ExecutionGraph API to determine next jobs to be launched. - logger.info("Checking the ExecutionGraph for study '%s' located in " - "%s...", dag.name, study_pkl[0]) - logger.info("Study Description: %s", dag.description) + :param out_path: A string containing the patht to a study root. + :returns: A dictionary containing the status of the study. + """ + stat_path = os.path.join(output_path, "status.csv") + lock_path = os.path.join(output_path, ".status.lock") + _ = {} + if os.path.exists(stat_path): + lock = FileLock(lock_path) + try: + with lock.acquire(timeout=10): + with open(stat_path, "r") as stat_file: + _ = csvtable_to_dict(stat_file) + except Timeout: + pass + + return _ + + @classmethod + def mark_cancelled(cls, output_path): + """ + Mark the study rooted at 'out_path'. + + :param out_path: A string containing the patht to a study root. + :returns: A dictionary containing the status of the study. + """ + lock_path = make_safe_path(output_path, cls._cancel_lock) + with open(lock_path, 'a'): + os.utime(lock_path, None) + + def initialize(self, batch_info, sleeptime=60): + """ + Initializes the Conductor instance based on the stored study. + + :param batch_info: A dict containing batch information. + :param sleeptime: The amount of sleep time between polling loops + [Default: 60s]. + """ + # Set our conductor's sleep time. + self.sleep_time = sleeptime + # Stage the study. + self._pkl_path, self._exec_dag = self._study.stage() + # Write metadata + self._exec_dag.set_adapter(batch_info) + self._study.store_metadata() + self._setup = True + + def monitor_study(self): + """Monitor a running study.""" + if not self._setup: + msg = \ + "Study '{}' located in '{}' not initialized. Initialize " \ + "study before calling launching. Aborting." \ + .format(self.study_name, self.output_path) + LOGGER.error(msg) + raise Exception(msg) + + # Set some fixed variables that monitor will use. + cancel_lock_path = make_safe_path(self.output_path, self._cancel_lock) + dag = self._exec_dag + pkl_path = \ + os.path.join(self._pkl_path, "{}.pkl".format(self._study.name)) + sleep_time = self.sleep_time + + LOGGER.debug( + "\n -------- Calling monitor study -------\n" + "pkl path = %s\n" + "cancel path = %s\n" + "sleep time = %s\n" + "------------------------------------------\n", + pkl_path, cancel_lock_path, sleep_time) - cancel_lock_path = os.path.join(args.directory, ".cancel.lock") - logger.info("Starting to monitor '%s'", dag.name) - completion_status = monitor_study(dag, study_pkl[0], - cancel_lock_path, args.sleeptime) + completion_status = StudyStatus.RUNNING + while completion_status == StudyStatus.RUNNING: + if os.path.exists(cancel_lock_path): + # cancel the study if a cancel lock file is found + cancel_lock = FileLock(cancel_lock_path) + try: + with cancel_lock.acquire(timeout=10): + # we have the lock + dag.cancel_study() + os.remove(cancel_lock_path) + LOGGER.info("Study '%s' has been cancelled.", dag.name) + except Timeout: + LOGGER.error("Failed to acquire cancellation lock.") + pass + + LOGGER.info("Checking DAG status at %s", str(datetime.now())) + # Execute steps that are ready + # Recieves StudyStatus enum + completion_status = dag.execute_ready_steps() + # Re-pickle the ExecutionGraph. + dag.pickle(pkl_path) + # Write out the state + dag.write_status(os.path.split(pkl_path)[0]) + # Sleep for SLEEPTIME in args if study not complete. + if completion_status == StudyStatus.RUNNING: + sleep(sleep_time) + + return completion_status + + def cleanup(self): + self._exec_dag.cleanup() + + +def main(): + """Run the main segment of the conductor.""" + conductor = None + + try: + # Parse the command line args and load the study. + parser = setup_parser() + args = parser.parse_args() + study = Conductor.load_study(args.directory) + setup_logging(study.name, args.directory, args.debug_lvl, + args.logpath, args.logstdout) + batch_info = Conductor.load_batch(args.directory) - logger.info("Cleaning up...") - dag.cleanup() - logger.info("Squeaky clean!") + conductor = Conductor(study) + conductor.initialize(batch_info, args.sleeptime) + completion_status = conductor.monitor_study() - # Explicitly return a 0 status. + LOGGER.info("Study completed with state '%s'.", completion_status) sys.exit(completion_status.value) except Exception as e: - logger.error(e.args, exc_info=True) + LOGGER.error(e.args, exc_info=True) raise e + finally: + if conductor: + LOGGER.info("Study exiting, cleaning up...") + conductor.cleanup() + LOGGER.info("Squeaky clean!") if __name__ == "__main__": diff --git a/maestrowf/datastructures/__init__.py b/maestrowf/datastructures/__init__.py index 2978b4abd..d9bdc1e24 100644 --- a/maestrowf/datastructures/__init__.py +++ b/maestrowf/datastructures/__init__.py @@ -40,9 +40,9 @@ import logging -from maestrowf.datastructures.dag import DAG -from maestrowf.datastructures.yamlspecification import YAMLSpecification +from .dag import DAG +from .yamlspecification import YAMLSpecification __all__ = ("DAG", "YAMLSpecification") -logger = logging.getLogger(__name__) +LOGGER = logging.getLogger(__name__) diff --git a/maestrowf/datastructures/core/executiongraph.py b/maestrowf/datastructures/core/executiongraph.py index 23afc0ddc..dfcce240c 100644 --- a/maestrowf/datastructures/core/executiongraph.py +++ b/maestrowf/datastructures/core/executiongraph.py @@ -1,26 +1,27 @@ """Module for the execution of DAG workflows.""" from collections import deque, OrderedDict from datetime import datetime -from filelock import FileLock, Timeout import getpass import logging import os -import pickle import shutil import tempfile +from filelock import FileLock, Timeout +from maestrowf.abstracts import PickleInterface from maestrowf.abstracts.enums import JobStatusCode, State, SubmissionCode, \ CancelCode, StudyStatus from maestrowf.datastructures.dag import DAG from maestrowf.datastructures.environment import Variable from maestrowf.interfaces import ScriptAdapterFactory -from maestrowf.utils import create_parentdir, get_duration +from maestrowf.utils import create_parentdir, get_duration, \ + round_datetime_seconds -logger = logging.getLogger(__name__) +LOGGER = logging.getLogger(__name__) SOURCE = "_source" -class _StepRecord(object): +class _StepRecord: """ A simple container object representing a workflow step record. @@ -48,6 +49,7 @@ def __init__(self, workspace, step, **kwargs): """ self.workspace = Variable("WORKSPACE", workspace) step.run["cmd"] = self.workspace.substitute(step.run["cmd"]) + step.run["restart"] = self.workspace.substitute(step.run["restart"]) self.jobid = kwargs.get("jobid", []) self.script = kwargs.get("script", "") @@ -82,10 +84,10 @@ def generate_script(self, adapter, tmp_dir=""): self.step.run["cmd"] = self.workspace.substitute(self.step.run["cmd"]) - logger.info("Generating script for %s into %s", self.name, scr_dir) + LOGGER.info("Generating script for %s into %s", self.name, scr_dir) self.to_be_scheduled, self.script, self.restart_script = \ adapter.write_script(scr_dir, self.step) - logger.info("Script: %s\nRestart: %s\nScheduled?: %s", + LOGGER.info("Script: %s\nRestart: %s\nScheduled?: %s", self.script, self.restart_script, self.to_be_scheduled) def execute(self, adapter): @@ -133,28 +135,28 @@ def _execute(self, adapter, script): def mark_submitted(self): """Mark the submission time of the record.""" - logger.debug( + LOGGER.debug( "Marking %s as submitted (PENDING) -- previously %s", self.name, self.status) self.status = State.PENDING if not self._submit_time: - self._submit_time = datetime.now() + self._submit_time = round_datetime_seconds(datetime.now()) else: - logger.warning( + LOGGER.warning( "Cannot set the submission time of '%s' because it has " "already been set.", self.name ) def mark_running(self): """Mark the start time of the record.""" - logger.debug( + LOGGER.debug( "Marking %s as running (RUNNING) -- previously %s", self.name, self.status) self.status = State.RUNNING if not self._start_time: - self._start_time = datetime.now() + self._start_time = round_datetime_seconds(datetime.now()) def mark_end(self, state): """ @@ -162,18 +164,18 @@ def mark_end(self, state): :param state: State enum corresponding to termination state. """ - logger.debug( + LOGGER.debug( "Marking %s as finished (%s) -- previously %s", self.name, state, self.status) self.status = state if not self._end_time: - self._end_time = datetime.now() + self._end_time = round_datetime_seconds(datetime.now()) def mark_restart(self): """Mark the end time of the record.""" - logger.debug( + LOGGER.debug( "Marking %s as restarting (TIMEOUT) -- previously %s", self.name, self.status) @@ -285,7 +287,7 @@ def restarts(self): return self._num_restarts -class ExecutionGraph(DAG): +class ExecutionGraph(DAG, PickleInterface): """ Datastructure that tracks, executes, and reports on study execution. @@ -301,7 +303,7 @@ class ExecutionGraph(DAG): """ def __init__(self, submission_attempts=1, submission_throttle=0, - use_tmp=False): + use_tmp=False, dry_run=False): """ Initialize a new instance of an ExecutionGraph. @@ -335,6 +337,7 @@ def __init__(self, submission_attempts=1, submission_throttle=0, # throttling, etc. should be listed here. self._submission_attempts = submission_attempts self._submission_throttle = submission_throttle + self.dry_run = dry_run # A map that tracks the dependencies of a step. # NOTE: I don't know how performant the Python dict structure is, but @@ -342,7 +345,7 @@ def __init__(self, submission_attempts=1, submission_throttle=0, # tree or something of that nature to guarantee worst case performance. self._dependencies = {} - logger.info( + LOGGER.info( "\n------------------------------------------\n" "Submission attempts = %d\n" "Submission throttle limit = %d\n" @@ -356,14 +359,14 @@ def __init__(self, submission_attempts=1, submission_throttle=0, if self._submission_attempts < 1: _msg = "Submission attempts should always be greater than 0. " \ "Received a value of {}.".format(self._submission_attempts) - logger.error(_msg) + LOGGER.error(_msg) raise ValueError(_msg) if self._submission_throttle < 0: _msg = "Throttling should be 0 for unthrottled or a positive " \ "integer for the number of allowed inflight jobs. " \ "Received a value of {}.".format(self._submission_throttle) - logger.error(_msg) + LOGGER.error(_msg) raise ValueError(_msg) def _check_tmp_dir(self): @@ -415,14 +418,14 @@ def set_adapter(self, adapter): if not isinstance(adapter, dict): msg = "Adapter settings must be contained in a dictionary." - logger.error(msg) + LOGGER.error(msg) raise TypeError(msg) # Check to see that the adapter type is something the if adapter["type"] not in ScriptAdapterFactory.get_valid_adapters(): msg = "'{}' adapter must be specfied in ScriptAdapterFactory." \ .format(adapter) - logger.error(msg) + LOGGER.error(msg) raise TypeError(msg) self._adapter = adapter @@ -438,41 +441,6 @@ def add_description(self, name, description, **kwargs): self._description["description"] = description self._description.update(kwargs) - @classmethod - def unpickle(cls, path): - """ - Load an ExecutionGraph instance from a pickle file. - - :param path: Path to a ExecutionGraph pickle file. - """ - with open(path, 'rb') as pkl: - dag = pickle.load(pkl) - - if not isinstance(dag, cls): - msg = "Object loaded from {path} is of type {type}. Expected an" \ - " object of type '{cls}.'".format(path=path, type=type(dag), - cls=type(cls)) - logger.error(msg) - raise TypeError(msg) - - return dag - - def pickle(self, path): - """ - Generate a pickle file of the graph instance. - - :param path: The path to write the pickle to. - """ - if not self._adapter: - msg = "A script adapter must be set before an ExecutionGraph is " \ - "pickled. Use the 'set_adapter' method to set a specific" \ - " script interface." - logger.error(msg) - raise Exception(msg) - - with open(path, 'wb') as pkl: - pickle.dump(self, pkl) - @property def name(self): """ @@ -514,7 +482,7 @@ def log_description(self): desc = ["{}: {}".format(key, value) for key, value in self._description.items()] desc = "\n".join(desc) - logger.info( + LOGGER.info( "\n==================================================\n" "%s\n" "==================================================\n", @@ -534,11 +502,11 @@ def generate_scripts(self): if not self._adapter: msg = "Adapter not found. Specify a ScriptAdapter using " \ "set_adapter." - logger.error(msg) + LOGGER.error(msg) raise ValueError(msg) # Set up the adapter. - logger.info("Generating scripts...") + LOGGER.info("Generating scripts...") adapter = ScriptAdapterFactory.get_adapter(self._adapter["type"]) adapter = adapter(**self._adapter) @@ -561,7 +529,7 @@ def _execute_record(self, record, adapter, restart=False): :param restart: True if the record needs restarting, False otherwise. """ # Logging for debugging. - logger.info("Calling execute for StepRecord '%s'", record.name) + LOGGER.info("Calling execute for StepRecord '%s'", record.name) num_restarts = 0 # Times this step has temporally restarted. retcode = None # Execution return code. @@ -570,38 +538,49 @@ def _execute_record(self, record, adapter, restart=False): # 1. If the JobStatus is not OK. # 2. num_restarts is less than self._submission_attempts self._check_tmp_dir() + + # Only set up the workspace the initial iteration. + if not restart: + LOGGER.debug("Setting up workspace for '%s' at %s", + record.name, str(datetime.now())) + # Generate the script for execution on the fly. + record.setup_workspace() # Generate the workspace. + record.generate_script(adapter, self._tmp_dir) + + if self.dry_run: + record.mark_end(State.DRYRUN) + self.completed_steps.add(record.name) + return + while retcode != SubmissionCode.OK and \ num_restarts < self._submission_attempts: - logger.info("Attempting submission of '%s' (attempt %d of %d)...", + LOGGER.info("Attempting submission of '%s' (attempt %d of %d)...", record.name, num_restarts + 1, self._submission_attempts) # We're not restarting -- submit as usual. if not restart: - logger.debug("Calling 'execute' on '%s' at %s", + LOGGER.debug("Calling 'execute' on '%s' at %s", record.name, str(datetime.now())) - # Generate the script for execution on the fly. - record.setup_workspace() # Generate the workspace. - record.generate_script(adapter, self._tmp_dir) retcode = record.execute(adapter) # Otherwise, it's a restart. else: # If the restart is specified, use the record restart script. - logger.debug("Calling 'restart' on '%s' at %s", + LOGGER.debug("Calling 'restart' on '%s' at %s", record.name, str(datetime.now())) # Generate the script for execution on the fly. record.generate_script(adapter, self._tmp_dir) retcode = record.restart(adapter) # Increment the number of restarts we've attempted. - logger.debug("Completed submission attempt %d", num_restarts) + LOGGER.debug("Completed submission attempt %d", num_restarts) num_restarts += 1 if retcode == SubmissionCode.OK: self.in_progress.add(record.name) if record.is_local_step: - logger.info("Local step %s executed with status OK. Complete.", + LOGGER.info("Local step %s executed with status OK. Complete.", record.name) record.mark_end(State.FINISHED) self.completed_steps.add(record.name) @@ -609,7 +588,7 @@ def _execute_record(self, record, adapter, restart=False): else: # Find the subtree, because anything dependent on this step now # failed. - logger.warning("'%s' failed to submit properly. " + LOGGER.warning("'%s' failed to submit properly. " "Step failed.", record.name) path, parent = self.bfs_subtree(record.name) for node in path: @@ -617,7 +596,7 @@ def _execute_record(self, record, adapter, restart=False): self.values[node].mark_end(State.FAILED) # After execution state debug logging. - logger.debug("After execution of '%s' -- New state is %s.", + LOGGER.debug("After execution of '%s' -- New state is %s.", record.name, record.status) def write_status(self, path): @@ -650,7 +629,7 @@ def write_status(self, path): def _check_study_completion(self): # We cancelled, return True marking study as complete. if self.is_canceled: - logger.info("Cancelled -- completing study.") + LOGGER.info("Cancelled -- completing study.") return StudyStatus.CANCELLED # check for completion of all steps @@ -693,14 +672,21 @@ def execute_ready_steps(self): adapter = ScriptAdapterFactory.get_adapter(self._adapter["type"]) adapter = adapter(**self._adapter) - retcode, job_status = self.check_study_status() - logger.debug("Checked status (retcode %s)-- %s", retcode, job_status) + if not self.dry_run: + LOGGER.debug("Checking status check...") + retcode, job_status = self.check_study_status() + else: + LOGGER.debug("DRYRUN: Skipping status check...") + retcode = JobStatusCode.OK + job_status = {} + + LOGGER.debug("Checked status (retcode %s)-- %s", retcode, job_status) # For now, if we can't check the status something is wrong. # Don't modify the DAG. if retcode == JobStatusCode.ERROR: msg = "Job status check failed -- Aborting." - logger.error(msg) + LOGGER.error(msg) raise RuntimeError(msg) elif retcode == JobStatusCode.OK: # For the status of each currently in progress job, check its @@ -708,20 +694,20 @@ def execute_ready_steps(self): cleanup_steps = set() # Steps that are in progress showing failed. for name, status in job_status.items(): - logger.debug("Checking job '%s' with status %s.", name, status) + LOGGER.debug("Checking job '%s' with status %s.", name, status) record = self.values[name] if status == State.FINISHED: # Mark the step complete and notate its end time. record.mark_end(State.FINISHED) - logger.info("Step '%s' marked as finished. Adding to " + LOGGER.info("Step '%s' marked as finished. Adding to " "complete set.", name) self.completed_steps.add(name) self.in_progress.remove(name) elif status == State.RUNNING: # When detect that a step is running, mark it. - logger.info("Step '%s' found to be running.") + LOGGER.info("Step '%s' found to be running.") record.mark_running() elif status == State.TIMEDOUT: @@ -730,13 +716,13 @@ def execute_ready_steps(self): # If we're under the restart limit, attempt a restart. if record.can_restart: if record.mark_restart(): - logger.info( + LOGGER.info( "Step '%s' timed out. Restarting (%s of %s).", name, record.restarts, record.restart_limit ) self._execute_record(record, adapter, restart=True) else: - logger.info("'%s' has been restarted %s of %s " + LOGGER.info("'%s' has been restarted %s of %s " "times. Marking step and all " "descendents as failed.", name, @@ -746,7 +732,7 @@ def execute_ready_steps(self): cleanup_steps.update(self.bfs_subtree(name)[0]) # Otherwise, we can't restart so mark the step timed out. else: - logger.info("'%s' timed out, but cannot be restarted." + LOGGER.info("'%s' timed out, but cannot be restarted." " Marked as TIMEDOUT.", name) # Mark that the step ended due to TIMEOUT. record.mark_end(State.TIMEDOUT) @@ -764,14 +750,14 @@ def execute_ready_steps(self): # TODO: Need to make sure that we do this a finite number # of times. # Resubmit the cmd. - logger.warning("Hardware failure detected. Attempting to " + LOGGER.warning("Hardware failure detected. Attempting to " "resubmit step '%s'.", name) # We can just let the logic below handle submission with # everything else. self.ready_steps.append(name) elif status == State.FAILED: - logger.warning( + LOGGER.warning( "Job failure reported. Aborting %s -- flagging all " "dependent jobs as failed.", name @@ -780,8 +766,18 @@ def execute_ready_steps(self): record.mark_end(State.FAILED) cleanup_steps.update(self.bfs_subtree(name)[0]) + elif status == State.UNKNOWN: + record.mark_end(State.UNKNOWN) + LOGGER.info( + "Step '%s' found in UNKNOWN state. Step was found " + "in '%s' state previously, marking as UNKNOWN. " + "Adding to failed steps.", + name, record.status) + cleanup_steps.update(self.bfs_subtree(name)[0]) + self.in_progress.remove(name) + elif status == State.CANCELLED: - logger.info("Step '%s' was cancelled.", name) + LOGGER.info("Step '%s' was cancelled.", name) self.in_progress.remove(name) record.mark_end(State.CANCELLED) @@ -801,17 +797,17 @@ def execute_ready_steps(self): # A completed step by definition has had its dependencies met. # Skip it. if key in self.completed_steps: - logger.debug("'%s' in completed set, skipping.", key) + LOGGER.debug("'%s' in completed set, skipping.", key) continue - logger.debug("Checking %s -- %s", key, record.jobid) + LOGGER.debug("Checking %s -- %s", key, record.jobid) # If the record is only INITIALIZED, we have encountered a step # that needs consideration. if record.status == State.INITIALIZED: - logger.debug("'%s' found to be initialized. Checking " + LOGGER.debug("'%s' found to be initialized. Checking " "dependencies. ", key) - logger.debug( + LOGGER.debug( "Unfulfilled dependencies: %s", self._dependencies[key]) @@ -820,7 +816,7 @@ def execute_ready_steps(self): self._dependencies[key]) self._dependencies[key] = \ self._dependencies[key] - set(s_completed) - logger.debug( + LOGGER.debug( "Completed dependencies: %s\n" "Remaining dependencies: %s", s_completed, self._dependencies[key]) @@ -828,16 +824,16 @@ def execute_ready_steps(self): # If the gating dependencies set is empty, we can execute. if not self._dependencies[key]: if key not in self.ready_steps: - logger.debug("All dependencies completed. Staging.") + LOGGER.debug("All dependencies completed. Staging.") self.ready_steps.append(key) else: - logger.debug("Already staged. Passing.") + LOGGER.debug("Already staged. Passing.") continue # We now have a collection of ready steps. Execute. # If we don't have a submission limit, go ahead and submit all. if self._submission_throttle == 0: - logger.info("Launching all ready steps...") + LOGGER.info("Launching all ready steps...") _available = len(self.ready_steps) # Else, we have a limit -- adhere to it. else: @@ -850,7 +846,7 @@ def execute_ready_steps(self): # computed number of slots. We could have free slots, but have less # in the queue. _available = min(_available, len(self.ready_steps)) - logger.info("Found %d available slots...", _available) + LOGGER.info("Found %d available slots...", _available) for i in range(0, _available): # Pop the record and execute using the helper method. @@ -858,12 +854,12 @@ def execute_ready_steps(self): # If we get to this point and we've cancelled, cancel the record. if self.is_canceled: - logger.info("Cancelling '%s' -- continuing.", _record.name) + LOGGER.info("Cancelling '%s' -- continuing.", _record.name) _record.mark_end(State.CANCELLED) self.cancelled_steps.add(_record.name) continue - logger.debug("Launching job %d -- %s", i, _record.name) + LOGGER.debug("Launching job %d -- %s", i, _record.name) self._execute_record(_record, adapter) # check the status of the study upon finishing this round of execution @@ -897,14 +893,14 @@ def check_study_status(self): # Based on return code, log something different. if retcode == JobStatusCode.OK: - logger.info("Jobs found for user '%s'.", getpass.getuser()) + LOGGER.info("Jobs found for user '%s'.", getpass.getuser()) return retcode, step_status elif retcode == JobStatusCode.NOJOBS: - logger.info("No jobs found.") + LOGGER.info("No jobs found.") return retcode, step_status else: msg = "Unknown Error (Code = {})".format(retcode) - logger.error(msg) + LOGGER.error(msg) return retcode, step_status def cancel_study(self): @@ -923,12 +919,12 @@ def cancel_study(self): self.is_canceled = True if crecord.cancel_status == CancelCode.OK: - logger.info("Successfully requested to cancel all jobs.") + LOGGER.info("Successfully requested to cancel all jobs.") elif crecord.cancel_status == CancelCode.ERROR: - logger.error( + LOGGER.error( "Failed to cancel jobs. (Code = %s)", crecord.return_code) else: - logger.error("Unknown Error (Code = %s)", crecord.return_code) + LOGGER.error("Unknown Error (Code = %s)", crecord.return_code) return crecord.cancel_status diff --git a/maestrowf/datastructures/core/parameters.py b/maestrowf/datastructures/core/parameters.py index b566cb656..b0be5c85e 100644 --- a/maestrowf/datastructures/core/parameters.py +++ b/maestrowf/datastructures/core/parameters.py @@ -39,8 +39,6 @@ import logging import re -from maestrowf.abstracts import SimObject - logger = logging.getLogger(__name__) @@ -161,7 +159,7 @@ def apply(self, item): return item -class ParameterGenerator(SimObject): +class ParameterGenerator: """ Class for containing parameters and generating combinations. diff --git a/maestrowf/datastructures/core/study.py b/maestrowf/datastructures/core/study.py index 928b26546..e6de13a73 100644 --- a/maestrowf/datastructures/core/study.py +++ b/maestrowf/datastructures/core/study.py @@ -34,14 +34,15 @@ import os import pickle import re +from types import MethodType import yaml -from maestrowf.abstracts import SimObject -from maestrowf.datastructures.core import ExecutionGraph +from maestrowf.abstracts import PickleInterface from maestrowf.datastructures.dag import DAG from maestrowf.utils import apply_function, create_parentdir, make_safe_path +from .executiongraph import ExecutionGraph -logger = logging.getLogger(__name__) +LOGGER = logging.getLogger(__name__) SOURCE = "_source" WSREGEX = re.compile( r"\$\(([-!\$%\^&\*\(\)_\+\|~=`{}\[\]:;<>\?,\.\/\w]+)\.workspace\)" @@ -51,7 +52,7 @@ ) -class StudyStep(SimObject): +class StudyStep: """ Class that represents the data and API for a single study step. @@ -121,7 +122,7 @@ def __ne__(self, other): return not self.__eq__(other) -class Study(DAG): +class Study(DAG, PickleInterface): """ Collection of high level objects to perform study construction. @@ -195,15 +196,17 @@ def __init__(self, name, description, self._out_path = out_path self._meta_path = os.path.join(out_path, "meta") - logger.info("OUTPUT_PATH = %s", out_path) + LOGGER.debug("OUTPUT_PATH = %s", out_path) # Flag the study as not having been set up and add the source node. self._issetup = False + self.is_configured = False self.add_node(SOURCE, None) # Settings for handling restarts and submission attempts. self._restart_limit = 0 self._submission_attempts = 0 self._use_tmp = False + self._dry_run = False # Management structures # The workspace used by each step. @@ -308,7 +311,7 @@ def load_metadata(self): msg = "Object loaded from {path} is of type {type}. Expected an" \ " object of type '{cls}.'".format(path=path, type=type(env), cls=type(self)) - logger.error(msg) + LOGGER.error(msg) raise TypeError(msg) metapath = os.path.join(self._meta_path, "metadata.yaml") @@ -335,7 +338,7 @@ def add_step(self, step): """ # Add the node to the DAG. self.add_node(step.name, step) - logger.info( + LOGGER.info( "Adding step '%s' to study '%s'...", step.name, self.name) # Apply the environment to the incoming step. step.__dict__ = \ @@ -344,7 +347,7 @@ def add_step(self, step): # If the step depends on a prior step, create an edge. if "depends" in step.run and step.run["depends"]: for dependency in step.run["depends"]: - logger.info("{0} is dependent on {1}. Creating edge (" + LOGGER.info("{0} is dependent on {1}. Creating edge (" "{1}, {0})...".format(step.name, dependency)) if "*" not in dependency: self.add_edge(dependency, step.name) @@ -375,21 +378,22 @@ def walk_study(self, src=SOURCE): def setup_workspace(self): """Set up the study's main workspace directory.""" try: - logger.info("Setting up study workspace in '%s'", self._out_path) + LOGGER.debug("Setting up study workspace in '%s'", self._out_path) create_parentdir(self._out_path) except Exception as e: - logger.error(e.args) + LOGGER.error(e.args) return False def setup_environment(self): """Set up the environment by acquiring outside dependencies.""" # Set up the environment if it hasn't been already. if not self.environment.is_set_up: - logger.info("Environment is setting up.") + LOGGER.debug("Environment is setting up.") self.environment.acquire_environment() def configure_study(self, submission_attempts=1, restart_limit=1, - throttle=0, use_tmp=False, hash_ws=False): + throttle=0, use_tmp=False, hash_ws=False, + dry_run=False): """ Perform initial configuration of a study. \ @@ -404,6 +408,8 @@ def configure_study(self, submission_attempts=1, restart_limit=1, denotes no cap].\ :param use_tmp: Boolean value specifying if the generated \ ExecutionGraph dumps its information into a temporary directory. \ + :param dry_run: Boolean value that toggles dry run to just generate \ + study workspaces and scripts without execution or status checking. \ :returns: True if the Study is successfully setup, False otherwise. \ """ @@ -412,20 +418,24 @@ def configure_study(self, submission_attempts=1, restart_limit=1, self._submission_throttle = throttle self._use_tmp = use_tmp self._hash_ws = hash_ws + self._dry_run = dry_run - logger.info( + LOGGER.info( "\n------------------------------------------\n" - "Output path = %s\n" "Submission attempts = %d\n" "Submission restart limit = %d\n" "Submission throttle limit = %d\n" "Use temporary directory = %s\n" "Hash workspaces = %s\n" + "Dry run enabled = %s\n" + "Output path = %s\n" "------------------------------------------", - self._out_path, submission_attempts, restart_limit, throttle, - use_tmp, hash_ws + submission_attempts, restart_limit, throttle, + use_tmp, hash_ws, dry_run, self._out_path ) + self.is_configured = True + def _stage(self, dag): """ Set up the ExecutionGraph of a parameterized study. @@ -436,7 +446,7 @@ def _stage(self, dag): steps. """ # Items to store that should be reset. - logger.info( + LOGGER.info( "\n==================================================\n" "Constructing parameter study '%s'\n" "==================================================\n", @@ -462,7 +472,7 @@ def _stage(self, dag): # used parameters of the step, and then adding all parameterized # combinations of funneled steps. for step in t_sorted: - logger.info( + LOGGER.info( "\n==================================================\n" "Processing step '%s'\n" "==================================================\n", @@ -470,7 +480,7 @@ def _stage(self, dag): ) # If we encounter SOURCE, just add it and continue. if step == SOURCE: - logger.info("Encountered '%s'. Adding and continuing.", SOURCE) + LOGGER.info("Encountered '%s'. Adding and continuing.", SOURCE) dag.add_node(SOURCE, None) continue @@ -484,15 +494,15 @@ def _stage(self, dag): s_params = self.parameters.get_used_parameters(node) p_params = set() # Used parameters excluding the current step. # Iterate through dependencies to update the p_params - logger.debug("\n*** Processing dependencies ***") + LOGGER.debug("\n*** Processing dependencies ***") for parent in node.run["depends"]: # If we have a dependency that is parameter independent, add # it to the hub dependency set. if "*" in parent: - logger.debug("Found funnel dependency -- %s", parent) + LOGGER.debug("Found funnel dependency -- %s", parent) self.hub_depends[step].add(re.sub(ALL_COMBOS, "", parent)) else: - logger.debug("Found dependency -- %s", parent) + LOGGER.debug("Found dependency -- %s", parent) # Otherwise, just note the parameters used by the step. self.depends[step].add(parent) p_params |= self.used_params[parent] @@ -500,24 +510,25 @@ def _stage(self, dag): # Search for workspace matches. These affect the expansion of a # node because they may use parameters. These are likely to cause # a node to fall into the 'Parameter Dependent' case. - used_spaces = re.findall(WSREGEX, node.run["cmd"]) + used_spaces = re.findall( + WSREGEX, "{} {}".format(node.run["cmd"], node.run["restart"])) for ws in used_spaces: if ws not in self.used_params: msg = "Workspace for '{}' is being used before it would" \ " be generated.".format(ws) - logger.error(msg) + LOGGER.error(msg) raise Exception(msg) # We have the case that if we're using a workspace of a step # that is a parameter independent dependency, we can skip it. # The parameters don't affect the combinations. if ws in self.hub_depends[step]: - logger.info( + LOGGER.info( "'%s' parameter independent association found. " "Skipping.", ws) continue - logger.debug( + LOGGER.debug( "Found workspace '%s' using parameters %s", ws, self.used_params[ws]) p_params |= self.used_params[ws] @@ -534,7 +545,7 @@ def _stage(self, dag): # 1. The step and all its preceding parents use no parameters. if not self.used_params[step]: - logger.info( + LOGGER.info( "\n-------------------------------------------------\n" "Adding step '%s' (No parameters used)\n" "-------------------------------------------------\n", @@ -546,23 +557,23 @@ def _stage(self, dag): workspace = make_safe_path(self._out_path, *[step]) self.workspaces[step] = workspace - logger.debug("Workspace: %s", workspace) + LOGGER.debug("Workspace: %s", workspace) # NOTE: I don't think it's valid to have a specific workspace # since a step with no parameters operates at the global level. # NOTE: Opting to save the old command for provenence reasons. cmd = node.run["cmd"] r_cmd = node.run["restart"] - logger.info("Searching for workspaces...\ncmd = %s", cmd) + LOGGER.info("Searching for workspaces...\ncmd = %s", cmd) for match in used_spaces: - logger.info("Workspace found -- %s", match) + LOGGER.info("Workspace found -- %s", match) workspace_var = "$({}.workspace)".format(match) if match in self.hub_depends[step]: # If we're looking at a parameter independent match # the workspace is the folder that contains all of # the outputs of all combinations for the step. ws = make_safe_path(self._out_path, *[match]) - logger.info("Found funnel workspace -- %s", ws) + LOGGER.info("Found funnel workspace -- %s", ws) else: ws = self.workspaces[match] cmd = cmd.replace(workspace_var, ws) @@ -571,35 +582,36 @@ def _stage(self, dag): # here, it's reflected in the ExecutionGraph. node = copy.deepcopy(node) node.run["cmd"] = cmd - logger.debug("New cmd = %s", cmd) - logger.debug("New restart = %s", r_cmd) + node.run["restart"] = r_cmd + LOGGER.debug("New cmd = %s", cmd) + LOGGER.debug("New restart = %s", r_cmd) dag.add_step(step, node, workspace, rlimit) if self.depends[step] or self.hub_depends[step]: # So, because we don't have used parameters, we can just # loop over the dependencies and add them. - logger.debug("Processing regular dependencies.") + LOGGER.debug("Processing regular dependencies.") for parent in self.depends[step]: - logger.info("Adding edge (%s, %s)...", parent, step) + LOGGER.info("Adding edge (%s, %s)...", parent, step) dag.add_connection(parent, step) # We can still have a case where we have steps that do # funnel into this one even though this particular step # is not parameterized. - logger.debug("Processing hub dependencies.") + LOGGER.debug("Processing hub dependencies.") for parent in self.hub_depends[step]: for item in self.step_combos[parent]: - logger.info("Adding edge (%s, %s)...", item, step) + LOGGER.info("Adding edge (%s, %s)...", item, step) dag.add_connection(item, step) else: # Otherwise, just add source since we're not dependent. - logger.debug("Adding edge (%s, %s)...", SOURCE, step) + LOGGER.debug("Adding edge (%s, %s)...", SOURCE, step) dag.add_connection(SOURCE, step) # 2. The step has used parameters. else: - logger.info( + LOGGER.info( "\n==================================================\n" "Expanding step '%s'\n" "==================================================\n" @@ -610,7 +622,7 @@ def _stage(self, dag): ) # Now we iterate over the combinations and expand the step. for combo in self.parameters: - logger.info("\n**********************************\n" + LOGGER.info("\n**********************************\n" "Combo [%s]\n" "**********************************", str(combo)) @@ -623,7 +635,7 @@ def _stage(self, dag): else: workspace = \ make_safe_path(self._out_path, *[step, combo_str]) - logger.debug("Workspace: %s", workspace) + LOGGER.debug("Workspace: %s", workspace) combo_str = "{}_{}".format(step, combo_str) self.workspaces[combo_str] = workspace @@ -639,23 +651,23 @@ def _stage(self, dag): # Substitute workspaces into the combination. cmd = step_exp.run["cmd"] r_cmd = step_exp.run["restart"] - logger.info("Searching for workspaces...\ncmd = %s", cmd) + LOGGER.info("Searching for workspaces...\ncmd = %s", cmd) for match in used_spaces: # Construct the workspace variable. - logger.info("Workspace found -- %s", ws) + LOGGER.info("Workspace found -- %s", ws) workspace_var = "$({}.workspace)".format(match) if match in self.hub_depends[step]: # If we're looking at a parameter independent match # the workspace is the folder that contains all of # the outputs of all combinations for the step. ws = make_safe_path(self._out_path, *[match]) - logger.info("Found funnel workspace -- %s", ws) + LOGGER.info("Found funnel workspace -- %s", ws) elif not self.used_params[match]: # If it's not a funneled dependency and the match # is not parameterized, then the workspace is just # the unparameterized match. ws = self.workspaces[match] - logger.info( + LOGGER.info( "Found unparameterized workspace -- %s", match) else: # Otherwise, we're dealing with a combination. @@ -663,14 +675,14 @@ def _stage(self, dag): match, combo.get_param_string(self.used_params[match]) ) - logger.info( + LOGGER.info( "Found parameterized workspace -- %s", ws) ws = self.workspaces[ws] # Replace in both the command and restart command. cmd = cmd.replace(workspace_var, ws) r_cmd = r_cmd.replace(workspace_var, ws) - logger.info("New cmd = %s", cmd) + LOGGER.info("New cmd = %s", cmd) step_exp.run["cmd"] = cmd step_exp.run["restart"] = r_cmd @@ -680,14 +692,14 @@ def _stage(self, dag): if self.depends[step] or self.hub_depends[step]: # So, because we don't have used parameters, we can # just loop over the dependencies and add them. - logger.info("Processing regular dependencies.") + LOGGER.info("Processing regular dependencies.") for p in self.depends[step]: if self.used_params[p]: p = "{}_{}".format( p, combo.get_param_string(self.used_params[p]) ) - logger.info( + LOGGER.info( "Adding edge (%s, %s)...", p, combo_str ) dag.add_connection(p, combo_str) @@ -695,16 +707,16 @@ def _stage(self, dag): # We can still have a case where we have steps that do # funnel into this one even though this particular step # is not parameterized. - logger.debug("Processing hub dependencies.") + LOGGER.debug("Processing hub dependencies.") for parent in self.hub_depends[step]: for item in self.step_combos[parent]: - logger.info( + LOGGER.info( "Adding edge (%s, %s)...", item, combo_str ) dag.add_connection(item, combo_str) else: # Otherwise, just add source since we're not dependent. - logger.debug( + LOGGER.debug( "Adding edge (%s, %s)...", SOURCE, combo_str ) dag.add_connection(SOURCE, combo_str) @@ -725,7 +737,7 @@ def _stage_linear(self, dag): for step in t_sorted: # If we find the source node, we can just add it and continue. if step == SOURCE: - logger.debug("Source node found.") + LOGGER.debug("Source node found.") dag.add_node(SOURCE, None) continue @@ -748,12 +760,12 @@ def _stage_linear(self, dag): cmd = node.run["cmd"] r_cmd = node.run["restart"] - logger.info("Searching for workspaces...\ncmd = %s", cmd) + LOGGER.info("Searching for workspaces...\ncmd = %s", cmd) used_spaces = re.findall(WSREGEX, cmd) for match in used_spaces: # In this case we don't need to look for any parameters, or # combination depdendent ("funnel") steps. It's a simple sub. - logger.info("Workspace found -- %s", match) + LOGGER.info("Workspace found -- %s", match) workspace_var = "$({}.workspace)".format(match) ws = self.workspaces[match] cmd = cmd.replace(workspace_var, ws) @@ -792,14 +804,14 @@ def stage(self): if not os.path.exists(self._out_path): msg = "Study {} is not set up for staging. Workspace does not " \ "exists (Output Dir = {}).".format(self.name, self._out_path) - logger.error(msg) + LOGGER.error(msg) raise Exception(msg) # If the environment isn't set up, raise an exception. if not self.environment.is_set_up: msg = "Study {} is not set up for staging. Environment is not " \ "set up. Aborting.".format(self.name) - logger.error(msg) + LOGGER.error(msg) raise Exception(msg) # After substituting, we should start getting combinations and @@ -825,8 +837,17 @@ def stage(self): dag = ExecutionGraph( submission_attempts=self._submission_attempts, submission_throttle=self._submission_throttle, - use_tmp=self._use_tmp) + use_tmp=self._use_tmp, dry_run=self._dry_run) dag.add_description(**self.description) dag.log_description() + # Because we're working within a Study class whose steps have already + # been verified to not contain a cycle, we can override the check for + # the execution graph. Because the execution graph is constructed from + # the study steps, it won't contain a cycle. + def _pass_detect_cycle(self): + pass + + dag.detect_cycle = MethodType(_pass_detect_cycle, dag) + return self._out_path, self._stage(dag) diff --git a/maestrowf/datastructures/core/studyenvironment.py b/maestrowf/datastructures/core/studyenvironment.py index 4d1de699a..06ef74ecf 100644 --- a/maestrowf/datastructures/core/studyenvironment.py +++ b/maestrowf/datastructures/core/studyenvironment.py @@ -31,12 +31,12 @@ import logging -from maestrowf.abstracts import SimObject, Dependency, Source, Substitution +from maestrowf.abstracts import Dependency, Source, Substitution -logger = logging.getLogger(__name__) +LOGGER = logging.getLogger(__name__) -class StudyEnvironment(SimObject): +class StudyEnvironment: """ StudyEnvironment for managing a study environment. @@ -58,7 +58,7 @@ def __init__(self): # Boolean that tracks if dependencies have been acquired. self._is_set_up = False - logger.debug("Initialized an empty StudyEnvironment.") + LOGGER.debug("Initialized an empty StudyEnvironment.") def __bool__(self): """ @@ -88,44 +88,44 @@ def add(self, item): # because the necessary variable could have not been added yet # and there's too much of a need to process a dependency first. name = None - logger.debug("Calling add with %s", str(item)) + LOGGER.debug("Calling add with %s", str(item)) if isinstance(item, Dependency): - logger.debug("Adding %s of type %s.", item.name, type(item)) - logger.debug("Value: %s.", item.__dict__) + LOGGER.debug("Adding %s of type %s.", item.name, type(item)) + LOGGER.debug("Value: %s.", item.__dict__) self.dependencies[item.name] = item name = item.name self._is_set_up = False elif isinstance(item, Substitution): - logger.debug("Value: %s", item.value) - logger.debug("Tokens: %s", self._tokens) + LOGGER.debug("Value: %s", item.value) + LOGGER.debug("Tokens: %s", self._tokens) name = item.name - logger.debug("Adding %s of type %s.", item.name, type(item)) + LOGGER.debug("Adding %s of type %s.", item.name, type(item)) if ( isinstance(item.value, str) and any(token in item.value for token in self._tokens)): - logger.debug("Label detected. Adding %s to labels", item.name) + LOGGER.debug("Label detected. Adding %s to labels", item.name) self.labels[item.name] = item else: self._tokens.add(item.token) self.substitutions[item.name] = item elif isinstance(item, Source): - logger.debug("Adding source %s", item.source) - logger.debug("Item source: %s", item.source) + LOGGER.debug("Adding source %s", item.source) + LOGGER.debug("Item source: %s", item.source) self.sources.append(item) else: error = "Received an item of type {}. Expected an item of base " \ "type Substitution, Source, or Dependency." \ .format(type(item)) - logger.exception(error) + LOGGER.exception(error) raise TypeError(error) if name and name in self._names: error = "A duplicate name '{}' has been detected. All names " \ "must be unique. Aborting.".format(name) - logger.exception(error) + LOGGER.exception(error) raise ValueError(error) else: - logger.debug("{} added to set of names.".format(name)) + LOGGER.debug("{} added to set of names.".format(name)) self._names.add(name) def find(self, key): @@ -136,20 +136,20 @@ def find(self, key): :returns: The environment object labeled by key, None if key is not found. """ - logger.debug("Looking for '%s'...", key) + LOGGER.debug("Looking for '%s'...", key) if key in self.dependencies: - logger.debug("Found '%s' in environment dependencies.", key) + LOGGER.debug("Found '%s' in environment dependencies.", key) return self.dependencies[key] if key in self.substitutions: - logger.debug("Found '%s' in environment substitutions.", key) + LOGGER.debug("Found '%s' in environment substitutions.", key) return self.substitutions[key] if key in self.labels: - logger.debug("Found '%s' in environment labels.", key) + LOGGER.debug("Found '%s' in environment labels.", key) return self.labels[key] - logger.debug("'%s' not found -- \n%s", key, self) + LOGGER.debug("'%s' not found -- \n%s", key, self) return None def remove(self, key): @@ -159,7 +159,7 @@ def remove(self, key): :param key: Name of the environment object to remove. :returns: The environment object labeled by key. """ - logger.debug("Looking to remove '%s'...", key) + LOGGER.debug("Looking to remove '%s'...", key) if key not in self._names: return None @@ -179,18 +179,18 @@ def remove(self, key): self._names.remove(key) return _ - logger.debug("'%s' not found -- \n%s", key, self) + LOGGER.debug("'%s' not found -- \n%s", key, self) return None def acquire_environment(self): """Acquire any environment items that may be stored remotely.""" if self._is_set_up: - logger.info("Environment already set up. Returning.") + LOGGER.info("Environment already set up. Returning.") return - logger.info("Acquiring dependencies") + LOGGER.debug("Acquiring dependencies") for dependency, value in self.dependencies.items(): - logger.info("Acquiring -- %s", dependency) + LOGGER.info("Acquiring -- %s", dependency) value.acquire(substitutions=self.substitutions.values()) self._is_set_up = True @@ -205,24 +205,24 @@ def apply_environment(self, item): if not item: return item - logger.debug("Applying environment to %s", item) - logger.debug("Processing labels...") + LOGGER.debug("Applying environment to %s", item) + LOGGER.debug("Processing labels...") for label, value in self.labels.items(): - logger.debug("Looking for %s in %s", label, item) + LOGGER.debug("Looking for %s in %s", label, item) item = value.substitute(item) - logger.debug("After substitution: %s", item) + LOGGER.debug("After substitution: %s", item) - logger.debug("Processing dependencies...") + LOGGER.debug("Processing dependencies...") for label, dependency in self.dependencies.items(): - logger.debug("Looking for %s in %s", label, item) + LOGGER.debug("Looking for %s in %s", label, item) item = dependency.substitute(item) - logger.debug("After substitution: %s", item) - logger.debug("Acquiring %s.", label) + LOGGER.debug("After substitution: %s", item) + LOGGER.debug("Acquiring %s.", label) - logger.debug("Processing substitutions...") + LOGGER.debug("Processing substitutions...") for substitution, value in self.substitutions.items(): - logger.debug("Looking for %s in %s", substitution, item) + LOGGER.debug("Looking for %s in %s", substitution, item) item = value.substitute(item) - logger.debug("After substitution: %s", item) + LOGGER.debug("After substitution: %s", item) return item diff --git a/maestrowf/datastructures/schemas.json b/maestrowf/datastructures/schemas.json new file mode 100644 index 000000000..27b501b17 --- /dev/null +++ b/maestrowf/datastructures/schemas.json @@ -0,0 +1,117 @@ +{ + "DESCRIPTION": { + "type": "object", + "properties": { + "name": {"type": "string", "minLength": 1}, + "description": {"type": "string", "minLength": 1} + }, + "required": [ + "name", + "description" + ] + }, + "PARAM": { + "type": "object", + "properties": { + "values": { + "type": "array" + }, + "label": {"type": "string", "minLength": 1} + }, + "additionalProperties": false, + "required": [ + "values", + "label" + ] + }, + "STUDY_STEP": { + "type": "object", + "properties": { + "name": {"type": "string", "minLength": 1}, + "description": {"type": "string", "minLength": 1}, + "run": { + "type": "object", + "properties": { + "cmd": {"type": "string", "minLength": 1}, + "depends": {"type": "array", "uniqueItems": true}, + "pre": {"type": "string", "minLength": 1}, + "post": {"type": "string", "minLength": 1}, + "restart": {"type": "string", "minLength": 1}, + "nodes": {"type": "integer"}, + "procs": {"type": "integer"}, + "gpus": {"type": "integer"}, + "cores per task": {"type": "integer"}, + "walltime": {"type": "string", "minLength": 1}, + "reservation": {"type": "string", "minLength": 1}, + "exclusive": {"type": "boolean"} + }, + "additionalProperties": false, + "required": [ + "cmd" + ] + } + }, + "additionalProperties": false, + "required": [ + "name", + "description", + "run" + ] + }, + "ENV": { + "type": "object", + "properties": { + "variables": {"type": "object"}, + "labels": {"type": "object"}, + "sources": {"type": "array"}, + "dependencies": { + "type": "object", + "properties": { + "paths": { + "type": "array", + "items": { + "type": "object", + "properties": { + "name": {"type": "string", "minLength": 1}, + "path": {"type": "string", "minLength": 1} + }, + "required": [ + "name", + "path" + ], + "additionalProperties": false + } + }, + "git": { + "type": "array", + "items": { + "properties": { + "name": {"type": "string", "minLength": 1}, + "path": {"type": "string", "minLength": 1}, + "url": {"type": "string", "minLength": 1}, + "tag": {"type": "string", "minLength": 1} + }, + "required": [ + "name", + "path", + "url" + ] + } + }, + "spack": { + "type": "object", + "properties": { + "name": {"type": "string", "minLength": 1}, + "package_name": {"type": "string", "minLength": 1} + }, + "required": [ + "type", + "package_name" + ] + } + } + } + }, + "additionalProperties": false + } +} diff --git a/maestrowf/datastructures/yamlspecification.py b/maestrowf/datastructures/yamlspecification.py index c66d2e6c3..57fe87aa5 100644 --- a/maestrowf/datastructures/yamlspecification.py +++ b/maestrowf/datastructures/yamlspecification.py @@ -30,18 +30,32 @@ """Module containing all things needed for a YAML Study Specification.""" from copy import deepcopy +import json import logging +import os +import re import yaml +import jsonschema + from maestrowf.abstracts import Specification -from maestrowf.datastructures.core import ParameterGenerator, \ - StudyEnvironment, \ - StudyStep +from maestrowf.datastructures.core import ( + ParameterGenerator, + StudyEnvironment, + StudyStep, +) from maestrowf.datastructures import environment logger = logging.getLogger(__name__) +# load the schemas.json file +dirpath = os.path.dirname(os.path.abspath(__file__)) +schemas_file = os.path.join(dirpath, "schemas.json") +with open(schemas_file, "r") as json_file: + schemas = json.load(json_file) + + class YAMLSpecification(Specification): """ Class for loading and verifying a Study Specification. @@ -94,12 +108,12 @@ def load_specification(cls, path): logger.info("Loading specification -- path = %s", path) try: # Load the YAML spec from the file. - with open(path, 'r') as data: + with open(path, "r") as data: specification = cls.load_specification_from_stream(data) except Exception as e: logger.exception(e.args) - raise + raise e # Populate the path to the specification that populated this instance. specification.path = path @@ -121,18 +135,18 @@ def load_specification_from_stream(cls, stream): logger.warning( "*** PyYAML is using an unsafe version with a known " "load vulnerability. Please upgrade your installation " - "to a more recent version! ***") + "to a more recent version! ***" + ) spec = yaml.load(stream) logger.debug("Loaded specification -- \n%s", spec["description"]) specification = cls() specification.path = None specification.description = spec.pop("description", {}) - specification.environment = spec.pop("env", - {'variables': {}, - 'sources': [], - 'labels': {}, - 'dependencies': {}}) + specification.environment = spec.pop( + "env", + {"variables": {}, "sources": [], "labels": {}, "dependencies": {}}, + ) specification.batch = spec.pop("batch", {}) specification.study = spec.pop("study", []) specification.globals = spec.pop("global.parameters", {}) @@ -145,11 +159,13 @@ def load_specification_from_stream(cls, stream): def verify(self): """Verify the whole specification.""" self.verify_description() + self.verify_environment() self.verify_study() self.verify_parameters() - logger.info("Specification %s - Verified. No apparent issues.", - self.name) + logger.debug( + "Specification %s - Verified. No apparent issues.", self.name + ) def verify_description(self): """ @@ -162,21 +178,13 @@ def verify_description(self): # and study. # We're REQUIRING that user specify a name and description for the # study. - try: - if not self.description: - raise ValueError("The 'description' key is required in the " - "YAML study for user sanity. Provide a " - "description.") - else: - if not (self.description["name"] and - self.description["description"]): - raise ValueError("Both 'name' and 'description' must be " - "provided for a valid study description.") - except Exception as e: - logger.exception(e.args) - raise - logger.info("Study description verified -- \n%s", self.description) + # validate description against json schema + YAMLSpecification.validate_schema( + "description", self.description, schemas["DESCRIPTION"] + ) + + logger.debug("Study description verified -- \n%s", self.description) def _verify_variables(self): """ @@ -189,23 +197,31 @@ def _verify_variables(self): :returns: A set of keys encountered in the variables section. """ keys_seen = set() - for key, value in self.environment['variables'].items(): + if "variables" not in self.environment: + return keys_seen + for key, value in self.environment["variables"].items(): logger.debug("Verifying %s...", key) if not key: - msg = "All variables must have a valid name. Empty strings " \ - "are not allowed." + msg = ( + "All variables must have a valid name. Empty strings " + "are not allowed." + ) logger.error(msg) raise ValueError(msg) if not value: - msg = "All variables must have a valid value. Empty strings " \ - "are not allowed." + msg = ( + "All variables must have a valid value. Empty strings " + "are not allowed." + ) logger.error(msg) raise ValueError(msg) - if key in self.keys_seen: - msg = "Variable name '{}' is already taken. All variable " \ - "names must be unique.".format(key) + if key in keys_seen: + msg = ( + "Variable name '{}' is already taken. All variable " + "names must be unique.".format(key) + ) logger.error(msg) raise ValueError(msg) @@ -230,45 +246,25 @@ def _verify_dependencies(self, keys_seen): :returns: A set of variable names seen. """ dep_types = ["path", "git", "spack"] - # Required keys - req_keys = {} - # For each PathDependency, we require two things: - # 1. A unique name (which will be its variable name for substitution) - # 2. A path. - req_keys["path"] = set(["name", "path"]) - - # For each GitDependency, required items are: - # 1. A name for the dependency (variable name to be substituted) - # 2. A git URL (ssh or http) - # 3. A path to store the repository to. - req_keys["git"] = set(["name", "path", "url"]) - - # For each SpackDependency, required items are: - # 1. A name for the dependency (variable name to be substituted) - # 2. Spack package name - req_keys["spack"] = set(["name", "package_name"]) + + if "dependencies" not in self.environment: + return keys_seen # For each dependency type, run through the required keys and name. for dep_type in dep_types: if dep_type in self.environment["dependencies"]: for item in self.environment["dependencies"][dep_type]: - # Check that the name and path attributes are populated. - missing_keys = req_keys[dep_type] - set(item.keys()) - if missing_keys: - msg = "Incomplete %s dependency detected -- missing" \ - " %s required keys. Value: %s" \ - .format(dep_type, missing_keys, item) - logger.error(msg) - raise ValueError(msg) - # Make sure that the "name" attribute is not taken. # Because every dependency should be responsible for # substituting itself into data, they are required to have # a name field. if item["name"] in keys_seen: - msg = "Variable name '{}' is already taken. All " \ - "variable names must be unique." \ - .format(item["name"]) + msg = ( + "Variable name '{}' is already taken. All " + "variable names must be unique.".format( + item["name"] + ) + ) logger.error(msg) raise ValueError(msg) @@ -278,6 +274,10 @@ def _verify_dependencies(self, keys_seen): def verify_environment(self): """Verify that the environment in a specification is valid.""" + # validate environment against json schema + YAMLSpecification.validate_schema( + "env", self.environment, schemas["ENV"] + ) # Verify the variables section of the specification. keys_seen = self._verify_variables() # Verify the sources section of the specification. @@ -291,11 +291,14 @@ def verify_study(self): # not a workflow... try: if not self.study: - raise ValueError("A study specification MUST contain at least " - "one step in its workflow.") - - logger.debug("Verified that a study block exists. -- verifying " - "steps.") + raise ValueError( + "A study specification MUST contain at least " + "one step in its workflow." + ) + + logger.debug( + "Verified that a study block exists. -- verifying " "steps." + ) self._verify_steps() except Exception as e: @@ -309,35 +312,20 @@ def _verify_steps(self): A study step is required to have a name, description, and a command. If any are missing, the specification is considered invalid. """ - # Verify that each step has the minimum required information. - # Each step in the 'study' section must at least specify three things. - # 1. name - # 2. description - # 3. run try: - req_study = set(["name", "description", "run"]) - req_run = set(["cmd"]) for step in self.study: - logger.debug("Verifying -- \n%s" % step) - # Missing attributes in a study step. - missing_attrs = req_study - set(step.keys()) - if missing_attrs: - raise ValueError("Missing required keys {} from study step" - " containing following: {}" - .format(missing_attrs, step)) - - # Each step's 'run' requires a command and dependency. - # Missing keys in the 'run' attribute of a step. - missing_attrs = req_run - set(step["run"].keys()) - if missing_attrs: - raise ValueError("Missing {} keys from the run " - "configuration for step named '{}'." - .format(missing_attrs, step["name"])) + # validate step against json schema + YAMLSpecification.validate_schema( + "study.{}".format(step["name"]), + step, + schemas["STUDY_STEP"], + ) + except Exception as e: logger.exception(e.args) raise - logger.debug("Verified") + logger.debug("Verified steps") def verify_parameters(self): """ @@ -357,36 +345,39 @@ def verify_parameters(self): """ try: if self.globals: - req_global = set(["values", "label"]) global_names = set() values_len = -1 for name, value in self.globals.items(): # Check if the name is in the set if name in global_names: - raise ValueError("Parameter '{}' is not unique in the " - "set of global parameters." - .format(name)) - - # Check to make sure the required info is in the parameter. - missing_attrs = req_global - set(value.keys()) - if missing_attrs: - raise ValueError("Missing {} keys in the global " - "parameter named {}" - .format(missing_attrs, name)) + raise ValueError( + "Parameter '{}' is not unique in the " + "set of global parameters.".format(name) + ) + + # validate parameters against json schema + YAMLSpecification.validate_schema( + "global.params.{}".format(name), + value, + schemas["PARAM"], + ) + # If label is a list, check its length against values. values = value["values"] label = value["label"] if isinstance(label, list): if len(values) != len(label): - raise ValueError("Global parameter '{}' the " - "values length does not " - "match the label list length." - .format(name)) + raise ValueError( + "Global parameter '{}' the " + "values length does not " + "match the label list length.".format(name) + ) if len(label) != len(set(label)): - raise ValueError("Global parameter '{}' the " - "label does not contain " - "unique labels." - .format(name)) + raise ValueError( + "Global parameter '{}' the " + "label does not contain " + "unique labels.".format(name) + ) # Add the name to global parameters encountered, check if # length of values is the same as previously encountered. global_names.add(name) @@ -397,14 +388,75 @@ def verify_parameters(self): # Check length. Exception if doesn't match. if len(values) != values_len: - raise ValueError("Global parameter '{}' is not the " - "same length as other parameters." - .format(name)) + raise ValueError( + "Global parameter '{}' is not the " + "same length as other parameters.".format(name) + ) except Exception as e: logger.exception(e.args) raise + @staticmethod + def validate_schema(parent_key, instance, schema): + """ + Given a parent key, an instance of a spec section, and a json schema + for that section, validate the instance against the schema. + """ + validator = jsonschema.Draft7Validator(schema) + errors = validator.iter_errors(instance) + for error in errors: + if error.validator == "additionalProperties": + unrecognized = ( + re.search(r"'.+'", error.message).group(0).strip("'") + ) + raise jsonschema.ValidationError( + "Unrecognized key '{0}' found in spec section '{1}'." + .format(unrecognized, parent_key) + ) + + elif error.validator == "type": + bad_val = ( + re.search(r".+ is not of type", error.message) + .group(0) + .strip(" is not of type") + ) + expected_type = ( + re.search(r"is not of type '.+'", error.message) + .group(0) + .strip("is not of type ") + .strip("'") + ) + raise jsonschema.ValidationError( + "Value {0} in spec section '{1}' must be of type '{2}'." + .format(bad_val, parent_key, expected_type) + ) + + elif error.validator == "required": + missing = re.search(r"'.+'", error.message) + missing = missing.group(0) + missing = missing.strip("'") + raise jsonschema.ValidationError( + "Key '{0}' is missing from spec section '{1}'.".format( + missing, parent_key + ) + ) + + elif error.validator == "uniqueItems": + raise jsonschema.ValidationError( + "Non-unique step names in spec section '{0}.run.depends'." + .format(parent_key) + ) + + elif error.validator == "minLength": + raise jsonschema.ValidationError( + "Empty string found in value in spec section '{0}'." + .format(parent_key) + ) + + else: + raise ValueError("Unknown validation error: " + error.message) + @property def output_path(self): """ @@ -414,8 +466,9 @@ def output_path(self): """ if "variables" in self.environment: if "OUTPUT_PATH" in self.environment["variables"]: - logger.debug("OUTPUT_PATH found in %s.", - self.description["name"]) + logger.debug( + "OUTPUT_PATH found in %s.", self.description["name"] + ) return self.environment["variables"]["OUTPUT_PATH"] else: return "" @@ -484,7 +537,7 @@ def get_study_environment(self): if "dependencies" in self.environment: if "paths" in self.environment["dependencies"]: for path in self.environment["dependencies"]["paths"]: - _ = environment.PathDependency(path['name'], path['path']) + _ = environment.PathDependency(path["name"], path["path"]) env.add(_) if "git" in self.environment["dependencies"]: @@ -493,8 +546,9 @@ def get_study_environment(self): optionals.pop("name") optionals.pop("url") optionals.pop("path") - _ = environment.GitDependency(repo["name"], repo["url"], - repo["path"], **optionals) + _ = environment.GitDependency( + repo["name"], repo["url"], repo["path"], **optionals + ) env.add(_) return env @@ -510,8 +564,9 @@ def get_parameters(self): if "name" not in value: params.add_parameter(key, value["values"], value["label"]) else: - params.add_parameter(key, value["values"], value["label"], - value["name"]) + params.add_parameter( + key, value["values"], value["label"], value["name"] + ) return params @@ -524,9 +579,9 @@ def get_study_steps(self): steps = [] for step in self.study: _ = StudyStep() - _.name = step['name'] - _.description = step['description'] - for key, value in step['run'].items(): + _.name = step["name"] + _.description = step["description"] + for key, value in step["run"].items(): _.run[key] = value steps.append(_) diff --git a/maestrowf/interfaces/script/localscriptadapter.py b/maestrowf/interfaces/script/localscriptadapter.py index b6e2198e9..911ed4cf2 100644 --- a/maestrowf/interfaces/script/localscriptadapter.py +++ b/maestrowf/interfaces/script/localscriptadapter.py @@ -137,8 +137,8 @@ def submit(self, step, path, cwd, job_map=None, env=None): output, err = p.communicate() retcode = p.wait() - o_path = os.path.join(cwd, "{}.out".format(step.name)) - e_path = os.path.join(cwd, "{}.err".format(step.name)) + o_path = os.path.join(cwd, "{}.{}.out".format(step.name, pid)) + e_path = os.path.join(cwd, "{}.{}.err".format(step.name, pid)) with open(o_path, "w") as out: out.write(output) diff --git a/maestrowf/interfaces/script/lsfscriptadapter.py b/maestrowf/interfaces/script/lsfscriptadapter.py new file mode 100644 index 000000000..d18af67c4 --- /dev/null +++ b/maestrowf/interfaces/script/lsfscriptadapter.py @@ -0,0 +1,410 @@ +############################################################################### +# Copyright (c) 2017, Lawrence Livermore National Security, LLC. +# Produced at the Lawrence Livermore National Laboratory +# Written by Francesco Di Natale, dinatale3@llnl.gov. +# +# LLNL-CODE-734340 +# All rights reserved. +# This file is part of MaestroWF, Version: 1.0.0. +# +# For details, see https://github.com/LLNL/maestrowf. +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. +############################################################################### + +"""LSF Scheduler interface implementation.""" +import getpass +import logging +from math import ceil +import os +import re +from subprocess import PIPE, Popen + +from maestrowf.abstracts.interfaces import SchedulerScriptAdapter +from maestrowf.abstracts.enums import CancelCode, JobStatusCode, State, \ + SubmissionCode +from maestrowf.interfaces.script import CancellationRecord, SubmissionRecord + + +LOGGER = logging.getLogger(__name__) + + +class LSFScriptAdapter(SchedulerScriptAdapter): + """A ScriptAdapter class for interfacing with the LSF cluster scheduler.""" + + NOJOB_REGEX = re.compile(r"^No\s") + key = "lsf" + + def __init__(self, **kwargs): + """ + Initialize an instance of the SlurmScriptAdapter. + + The SlurmScriptAdapter is this package's interface to the Slurm + scheduler. This adapter constructs Slurm scripts for a StudyStep based + on user set defaults and local settings present in each step. + + The expected keyword arguments that are expected when the Slurm adapter + is instantiated are as follows: + - host: The cluster to execute scripts on. + - bank: The account to charge computing time to. + - queue: Scheduler queue scripts should be submitted to. + - tasks: The number of compute nodes to be reserved for computing. + + :param **kwargs: A dictionary with default settings for the adapter. + """ + super(LSFScriptAdapter, self).__init__() + + # NOTE: Host doesn't seem to matter for SLURM. sbatch assumes that the + # current host is where submission occurs. + self.add_batch_parameter("host", kwargs.pop("host")) + self.add_batch_parameter("bank", kwargs.pop("bank")) + self.add_batch_parameter("queue", kwargs.pop("queue")) + self.add_batch_parameter("nodes", kwargs.pop("nodes", "1")) + + self._exec = "#!/bin/bash" + self._header = { + "nodes": "#BSUB -nnodes {nodes}", + "queue": "#BSUB -q {queue}", + "bank": "#BSUB -G {bank}", + "walltime": "#BSUB -W {walltime}", + "job-name": "#BSUB -J {job-name}", + "output": "#BSUB -o {output}", + "error": "#BSUB -e {error}", + } + + self._cmd_flags = { + "cmd": "jsrun --bind rs", + "ntasks": "--tasks_per_rs {procs} --cpu_per_rs {procs}", + "nodes": "--nrs", + "gpus": "-g", + "reservation": "-J", + } + + def get_header(self, step): + """ + Generate the header present at the top of LSF execution scripts. + + :param step: A StudyStep instance. + :returns: A string of the header based on internal batch parameters and + the parameter step. + """ + run = dict(step.run) + batch_header = dict(self._batch) + batch_header["nodes"] = run.pop("nodes", self._batch["nodes"]) + batch_header["job-name"] = step.name.replace(" ", "_") + batch_header["output"] = "{}.%J.out".format(batch_header["job-name"]) + batch_header["error"] = "{}.%J.err".format(batch_header["job-name"]) + + # LSF requires an hour and minutes format. We need to attempt to split + # and correct if we get something that's coming in as HH:MM:SS + walltime = run.pop("walltime") + wt_split = walltime.split(":") + if len(wt_split) == 3: + # If wall time is specified in three parts, we'll just calculate + # the minutes off of the seconds and then shift up to hours if + # needed. + seconds_minutes = ceil(float(wt_split[2])/60) + total_minutes = int(wt_split[1]) + seconds_minutes + hours = int(wt_split[0]) + int(total_minutes/60) + total_minutes %= 60 + walltime = "{:02d}:{:02d}".format(hours, int(total_minutes)) + + batch_header["walltime"] = walltime + + modified_header = [self._exec] + for key, value in self._header.items(): + modified_header.append(value.format(**batch_header)) + + return "\n".join(modified_header) + + def get_parallelize_command(self, procs, nodes=None, **kwargs): + """ + Generate the LSF parallelization segement of the command line. + + :param procs: Number of processors to allocate to the parallel call. + :param nodes: Number of nodes to allocate to the parallel call + (default = 1). + :returns: A string of the parallelize command configured using nodes + and procs. + """ + args = [self._cmd_flags["cmd"]] + + if nodes: + _nodes = nodes + args += [ + self._cmd_flags["nodes"], + str(nodes) + ] + else: + _nodes = 1 + + _procs = int(procs/_nodes) # Compute the number of CPUs per node (rs) + # Processors segment + args += [ + self._cmd_flags["ntasks"].format(procs=_procs) + ] + + # If we have GPUs being requested, add them to the command. + gpus = kwargs.get("gpus", 0) + if gpus: + args += [ + self._cmd_flags["gpus"], + str(gpus) + ] + + return " ".join(args) + + def submit(self, step, path, cwd, job_map=None, env=None): + """ + Submit a script to the LSF scheduler. + + :param step: The StudyStep instance this submission is based on. + :param path: Local path to the script to be executed. + :param cwd: Path to the current working directory. + :param job_map: A dictionary mapping step names to their job + identifiers. + :param env: A dict containing a modified environment for execution. + :returns: The return status of the submission command and job + identiifer. + """ + args = ["bsub"] + + if "reservation" in self._batch: + args += [ + "-U", + self._batch["reservation"] + ] + + args += ["-cwd", cwd, "<", path] + cmd = " ".join(args) + LOGGER.debug("cwd = %s", cwd) + LOGGER.debug("Command to execute: %s", cmd) + p = Popen(cmd, shell=True, stdout=PIPE, stderr=PIPE, cwd=cwd, env=env) + output, err = p.communicate() + retcode = p.wait() + output = output.decode("utf-8") + + # TODO: We need to check for dependencies here. The sbatch is where + # dependent batch jobs are specified. If we're trying to launch + # everything at once then that should happen here. + + if retcode == 0: + LOGGER.info("Submission returned status OK.") + return SubmissionRecord( + SubmissionCode.OK, retcode, + re.search('[0-9]+', output).group(0)) + else: + LOGGER.warning("Submission returned an error.") + return SubmissionRecord(SubmissionCode.ERROR, retcode, -1) + + def check_jobs(self, joblist): + """ + For the given job list, query execution status. + + This method uses the scontrol show job command and does a + regex search for job information. + + :param joblist: A list of job identifiers to be queried. + :returns: The return code of the status query, and a dictionary of job + identifiers to their status. + """ + # TODO: This method needs to be updated to use sacct. + # squeue options: + # -u = username to search queues for. + # -t = list of job states to search for. 'all' for all states. + # -o = status output formatting + o_format = "jobid:7 stat:5 exit_code:10 exit_reason:50 delimiter='|'" + stat_cmd = "bjobs -a -u $USER -o \"{}\"" + cmd = stat_cmd.format(o_format) + LOGGER.debug("bjobs cmd = \"%s\"", cmd) + p = Popen(cmd, shell=True, stdout=PIPE, stderr=PIPE) + output, err = p.communicate() + retcode = p.wait() + output = output.decode("utf-8") + + status = {} + for jobid in joblist: + LOGGER.debug("Looking for jobid %s", jobid) + status[jobid] = None + + state_index = 1 + jobid_index = 0 + term_reason = 3 + if retcode == 0: + # It seems that LSF may still return 0 even if it found nothing. + # We'll explicitly check for a "^No " regex in the event that the + # system is configured to return 0. + no_jobs = re.search(self.NOJOB_REGEX, output) + if no_jobs: + LOGGER.warning("User '%s' has no jobs executing. Returning.", + getpass.getuser()) + return JobStatusCode.NOJOBS, {} + + # Otherwise, we can just process as normal. + for job in output.split("\n")[1:]: + LOGGER.debug("Job Entry: %s", job) + # The squeue command output is split with the following indices + # used for specific information: + # 0 - Job Identifier + # 1 - Status of the job + # 2 - Exit code application terminated with + # 3 - Reason for termination (if applicable) + job_split = [x.strip() for x in job.split("|")] + LOGGER.debug("Entry split: %s", job_split) + if len(job_split) < 4: + LOGGER.debug( + "Entry has less than 4 fields. Skipping.", + job_split) + continue + + while job_split[0] == "": + LOGGER.debug("Removing blank entry from head of status.") + job_split = job_split[1:] + + if not job_split: + LOGGER.debug("Continuing...") + continue + + if job_split[jobid_index] in status: + if job_split[state_index] == "EXIT": + if "TERM_RUNLIMIT" in job_split[term_reason]: + _j_state = "TIMEOUT" + elif "TERM_OWNER" in job_split[term_reason]: + _j_state = "CANCELLED" + else: + _j_state = job_split[state_index] + else: + _j_state = job_split[state_index] + _state = self._state(_j_state) + LOGGER.debug("ID Found. %s -- %s", + job_split[state_index], + _state) + status[job_split[jobid_index]] = _state + + return JobStatusCode.OK, status + # NOTE: We're keeping this here for now since we could see it in the + # future... + elif retcode == 255: + LOGGER.warning("User '%s' has no jobs executing. Returning.", + getpass.getuser()) + return JobStatusCode.NOJOBS, status + else: + LOGGER.error("Error code '%s' seen. Unexpected behavior " + "encountered.", retcode) + return JobStatusCode.ERROR, status + + def cancel_jobs(self, joblist): + """ + For the given job list, cancel each job. + + :param joblist: A list of job identifiers to be cancelled. + :returns: The return code to indicate if jobs were cancelled. + """ + # If we don't have any jobs to check, just return status OK. + if not joblist: + return CancelCode.OK + + cmd = "bkill {}".format(" ".join(joblist)) + p = Popen(cmd, shell=True, stdout=PIPE, stderr=PIPE) + output, err = p.communicate() + retcode = p.wait() + + if retcode == 0: + return CancellationRecord(CancelCode.OK, retcode) + else: + LOGGER.error("Error code '%s' seen. Unexpected behavior " + "encountered.", retcode) + return CancellationRecord(CancelCode.ERROR, retcode) + + def _state(self, lsf_state): + """ + Map a scheduler specific job state to a Study.State enum. + + :param slurm_state: String representation of scheduler job status. + :returns: A Study.State enum corresponding to parameter job_state. + """ + # NOTE: fdinatale -- If I'm understanding this correctly, there are + # four naturally occurring states (excluding states of suspension.) + # This is somewhat problematic because we don't actually get a time out + # status here. We probably need to start considering what to do with + # the post and pre monikers in steps. + LOGGER.debug("Received LSF State -- %s", lsf_state) + if lsf_state == "RUN": + return State.RUNNING + elif lsf_state == "PEND": + return State.PENDING + elif lsf_state == "DONE": + return State.FINISHED + elif lsf_state == "CANCELLED": + return State.CANCELLED + elif lsf_state == "EXIT": + return State.FAILED + elif lsf_state == "TIMEOUT": + return State.TIMEDOUT + elif lsf_state == "WAIT" or lsf_state == "PROV": + return State.WAITING + elif lsf_state == "UNKWN": + return State.UNKNOWN + else: + return State.UNKNOWN + + def _write_script(self, ws_path, step): + """ + Write a LSF script to the workspace of a workflow step. + + The job_map optional parameter is a map of workflow step names to job + identifiers. This parameter so far is only planned to be used when a + study is configured to be launched in one go (more or less a script + chain using a scheduler's dependency setting). The functionality of + the parameter may change depending on both future intended use. + + :param ws_path: Path to the workspace directory of the step. + :param step: An instance of a StudyStep. + :returns: Boolean value (True if to be scheduled), the path to the + written script for run["cmd"], and the path to the script written for + run["restart"] (if it exists). + """ + to_be_scheduled, cmd, restart = self.get_scheduler_command(step) + + fname = "{}.lsf.cmd".format(step.name) + script_path = os.path.join(ws_path, fname) + with open(script_path, "w") as script: + if to_be_scheduled: + script.write(self.get_header(step)) + else: + script.write(self._exec) + + cmd = "\n\n{}\n".format(cmd) + script.write(cmd) + + if restart: + rname = "{}.restart.lsf.cmd".format(step.name) + restart_path = os.path.join(ws_path, rname) + + with open(restart_path, "w") as script: + if to_be_scheduled: + script.write(self.get_header(step)) + else: + script.write(self._exec) + + cmd = "\n\n{}\n".format(restart) + script.write(cmd) + else: + restart_path = None + + return to_be_scheduled, script_path, restart_path diff --git a/maestrowf/interfaces/script/slurmscriptadapter.py b/maestrowf/interfaces/script/slurmscriptadapter.py index 0643e9db8..cef122203 100644 --- a/maestrowf/interfaces/script/slurmscriptadapter.py +++ b/maestrowf/interfaces/script/slurmscriptadapter.py @@ -32,6 +32,11 @@ import logging import os import re +# In order to support Python2.7, we need to catch the import error. +try: + from collections import ChainMap +except ImportError: + from chainmap import ChainMap from maestrowf.abstracts.interfaces import SchedulerScriptAdapter from maestrowf.abstracts.enums import JobStatusCode, State, SubmissionCode, \ @@ -74,13 +79,24 @@ def __init__(self, **kwargs): self.add_batch_parameter("nodes", kwargs.pop("nodes", "1")) self.add_batch_parameter("reservation", kwargs.pop("reservation", "")) + # Check for procs separately, as we don't want it in the header if it's + # not present. + procs = kwargs.get("procs", None) + if procs: + self.add_batch_parameter("procs", procs) + self._header = { - "nodes": "#SBATCH -N {nodes}", - "queue": "#SBATCH -p {queue}", - "bank": "#SBATCH -A {bank}", - "walltime": "#SBATCH -t {walltime}", - "job-name": "#SBATCH -J {job-name}", - "comment": "#SBATCH --comment \"{comment}\"" + "nodes": "#SBATCH --nodes={nodes}", + "queue": "#SBATCH --partition={queue}", + "bank": "#SBATCH --account={bank}", + "walltime": "#SBATCH --time={walltime}", + "job-name": + "#SBATCH --job-name=\"{job-name}\"\n" + "#SBATCH --output=\"{job-name}.out\"\n" + "#SBATCH --error=\"{job-name}.err\"", + "comment": "#SBATCH --comment \"{comment}\"", + "reservation": "#SBATCH --reservation=\"{reservation}\"", + "gpus": "#SBATCH --gres=gpu:{gpus}" } self._cmd_flags = { @@ -88,7 +104,6 @@ def __init__(self, **kwargs): "depends": "--dependency", "ntasks": "-n", "nodes": "-N", - "reservation": "--reservation", "cores per task": "-c", } self._unsupported = set(["cmd", "depends", "ntasks", "nodes"]) @@ -101,22 +116,26 @@ def get_header(self, step): :returns: A string of the header based on internal batch parameters and the parameter step. """ - run = dict(step.run) - batch_header = dict(self._batch) - batch_header["walltime"] = run.pop("walltime") - if run["nodes"]: - batch_header["nodes"] = run.pop("nodes") - batch_header["job-name"] = step.name.replace(" ", "_") - batch_header["comment"] = step.description.replace("\n", " ") + resources = ChainMap(step.run, self._batch) + resources["job-name"] = step.name.replace(" ", "_") + resources["comment"] = step.description.replace("\n", " ") modified_header = ["#!{}".format(self._exec)] for key, value in self._header.items(): - # If we're looking at the bank and the reservation header exists, - # skip the bank to prefer the reservation. - if key == "bank" and "reservation" in self._batch: - if self._batch["reservation"]: - continue - modified_header.append(value.format(**batch_header)) + if key not in resources: + continue + + if resources[key]: + modified_header.append(value.format(**resources)) + + if "procs" in self._batch: + modified_header.append( + "#SBATCH --ntasks={}".format(resources["procs"]) + ) + + exclusive = resources.get("exclusive", False) + if exclusive: + modified_header.append("#SBATCH --exclusive") return "\n".join(modified_header) @@ -197,7 +216,8 @@ def submit(self, step, path, cwd, job_map=None, env=None): jid = re.search('[0-9]+', output).group(0) return SubmissionRecord(SubmissionCode.OK, retcode, jid) else: - LOGGER.warning("Submission returned an error.") + LOGGER.warning( + "Submission returned an error (see next line).\n%s", err) return SubmissionRecord(SubmissionCode.ERROR, retcode) def check_jobs(self, joblist): diff --git a/maestrowf/maestro.py b/maestrowf/maestro.py index 9c29c6559..0daf23a79 100644 --- a/maestrowf/maestro.py +++ b/maestrowf/maestro.py @@ -29,8 +29,7 @@ """A script for launching a YAML study specification.""" from argparse import ArgumentParser, ArgumentError, RawTextHelpFormatter -from filelock import FileLock, Timeout -import inspect +import jsonschema import logging import os import shutil @@ -40,62 +39,58 @@ import time from maestrowf import __version__ -from maestrowf.conductor import monitor_study +from maestrowf.conductor import Conductor from maestrowf.datastructures import YAMLSpecification from maestrowf.datastructures.core import Study from maestrowf.datastructures.environment import Variable from maestrowf.utils import \ - create_parentdir, create_dictionary, csvtable_to_dict, make_safe_path, \ + create_parentdir, create_dictionary, LoggerUtility, make_safe_path, \ start_process # Program Globals -ROOTLOGGER = logging.getLogger(inspect.getmodule(__name__)) LOGGER = logging.getLogger(__name__) +LOG_UTIL = LoggerUtility(LOGGER) # Configuration globals -LFORMAT = "%(asctime)s - %(name)s:%(funcName)s:%(lineno)s - " \ - "%(levelname)s - %(message)s" +DEBUG_FORMAT = "[%(asctime)s: %(levelname)s] " \ + "[%(module)s: %(lineno)d] %(message)s" +LFORMAT = "[%(asctime)s: %(levelname)s] %(message)s" ACCEPTED_INPUT = set(["yes", "y"]) def status_study(args): """Check and print the status of an executing study.""" - study_path = args.directory - stat_path = os.path.join(study_path, "status.csv") - lock_path = os.path.join(study_path, ".status.lock") - if os.path.exists(stat_path): - lock = FileLock(lock_path) - try: - with lock.acquire(timeout=10): - with open(stat_path, "r") as stat_file: - _ = csvtable_to_dict(stat_file) - print(tabulate.tabulate(_, headers="keys")) - except Timeout: - pass + status = Conductor.get_status(args.directory) - return 0 + if status: + print(tabulate.tabulate(status, headers="keys")) + return 0 + else: + print( + "Status check failed. If the issue persists, please verify that" + "you are passing in a path to a study.") + return 1 def cancel_study(args): """Flag a study to be cancelled.""" if not os.path.isdir(args.directory): + print("Attempted to cancel a path that was not a directory.") return 1 - lock_path = os.path.join(args.directory, ".cancel.lock") - - with open(lock_path, 'a'): - os.utime(lock_path, None) + Conductor.mark_cancelled(args.directory) return 0 -def load_parameter_generator(path, kwargs): +def load_parameter_generator(path, env, kwargs): """ Import and load custom parameter Python files. :param path: Path to a Python file containing the function \ 'get_custom_generator'. + :param env: A StudyEnvironment object containing custom information. :param kwargs: Dictionary containing keyword arguments for the function \ 'get_custom_generator'. :returns: A populated ParameterGenerator instance. @@ -109,20 +104,20 @@ def load_parameter_generator(path, kwargs): spec = importlib.util.spec_from_file_location("custom_gen", path) f = importlib.util.module_from_spec(spec) spec.loader.exec_module(f) - return f.get_custom_generator(**kwargs) + return f.get_custom_generator(env, **kwargs) except ImportError: try: # Python 3.3 from importlib.machinery import SourceFileLoader LOGGER.debug("Using Python 3.4 SourceFileLoader...") f = SourceFileLoader("custom_gen", path).load_module() - return f.get_custom_generator(**kwargs) + return f.get_custom_generator(env, **kwargs) except ImportError: # Python 2 import imp LOGGER.debug("Using Python 2 imp library...") f = imp.load_source("custom_gen", path) - return f.get_custom_generator(**kwargs) + return f.get_custom_generator(env, **kwargs) except Exception as e: LOGGER.exception(str(e)) raise e @@ -131,7 +126,11 @@ def load_parameter_generator(path, kwargs): def run_study(args): """Run a Maestro study.""" # Load the Specification - spec = YAMLSpecification.load_specification(args.specification) + try: + spec = YAMLSpecification.load_specification(args.specification) + except jsonschema.ValidationError as e: + LOGGER.error(e.message) + sys.exit(1) environment = spec.get_study_environment() steps = spec.get_study_steps() @@ -174,8 +173,10 @@ def run_study(args): output_path = make_safe_path(out_dir, *[out_name]) environment.add(Variable("OUTPUT_PATH", output_path)) - # Now that we know outpath, set up logging. - setup_logging(args, output_path, spec.name.replace(" ", "_").lower()) + # Set up file logging + create_parentdir(os.path.join(output_path, "logs")) + log_path = os.path.join(output_path, "logs", "{}.log".format(spec.name)) + LOG_UTIL.add_file_handler(log_path, LFORMAT, args.debug_lvl) # Check for pargs without the matching pgen if args.pargs and not args.pgen: @@ -183,6 +184,11 @@ def run_study(args): LOGGER.exception(msg) raise ArgumentError(msg) + # Addition of the $(SPECROOT) to the environment. + spec_root = os.path.split(args.specification)[0] + spec_root = Variable("SPECROOT", os.path.abspath(spec_root)) + environment.add(spec_root) + # Handle loading a custom ParameterGenerator if specified. if args.pgen: # 'pgen_args' has a default of an empty list, which should translate @@ -190,15 +196,16 @@ def run_study(args): kwargs = create_dictionary(args.pargs) # Copy the Python file used to generate parameters. shutil.copy(args.pgen, output_path) - parameters = load_parameter_generator(args.pgen, kwargs) + + # Add keywords and environment from the spec to pgen args. + kwargs["OUTPUT_PATH"] = output_path + kwargs["SPECROOT"] = spec_root + + # Load the parameter generator. + parameters = load_parameter_generator(args.pgen, environment, kwargs) else: parameters = spec.get_parameters() - # Addition of the $(SPECROOT) to the environment. - spec_root = os.path.split(args.specification)[0] - spec_root = Variable("SPECROOT", os.path.abspath(spec_root)) - environment.add(spec_root) - # Setup the study. study = Study(spec.name, spec.description, studyenv=environment, parameters=parameters, steps=steps, out_path=output_path) @@ -226,37 +233,33 @@ def run_study(args): # Set up the study workspace and configure it for execution. study.setup_workspace() - study.setup_environment() study.configure_study( throttle=args.throttle, submission_attempts=args.attempts, - restart_limit=args.rlimit, use_tmp=args.usetmp, hash_ws=args.hashws) - - # Stage the study. - path, exec_dag = study.stage() - # Write metadata - study.store_metadata() + restart_limit=args.rlimit, use_tmp=args.usetmp, hash_ws=args.hashws, + dry_run=args.dry) + study.setup_environment() - if not spec.batch: - exec_dag.set_adapter({"type": "local"}) + if args.dry: + # If performing a dry run, drive sleep time down to generate scripts. + sleeptime = 1 else: - if "type" not in spec.batch: - spec.batch["type"] = "local" - - exec_dag.set_adapter(spec.batch) - + # else, use args to decide sleeptime + sleeptime = args.sleeptime + + batch = {"type": "local"} + if spec.batch: + batch = spec.batch + if "type" not in batch: + batch["type"] = "local" # Copy the spec to the output directory - shutil.copy(args.specification, path) - - # Check for a dry run - if args.dryrun: - raise NotImplementedError("The 'dryrun' mode is in development.") + shutil.copy(args.specification, study.output_path) - # Pickle up the DAG - pkl_path = make_safe_path(path, *["{}.pkl".format(study.name)]) - exec_dag.pickle(pkl_path) + # Use the Conductor's classmethod to store the study. + Conductor.store_study(study) + Conductor.store_batch(study.output_path, batch) # If we are automatically launching, just set the input as yes. - if args.autoyes: + if args.autoyes or args.dry: uinput = "y" elif args.autono: uinput = "n" @@ -267,22 +270,22 @@ def run_study(args): if args.fg: # Launch in the foreground. LOGGER.info("Running Maestro Conductor in the foreground.") - cancel_path = os.path.join(path, ".cancel.lock") - # capture the StudyStatus enum to return - completion_status = monitor_study(exec_dag, pkl_path, - cancel_path, args.sleeptime) + conductor = Conductor(study) + conductor.initialize(batch, sleeptime) + completion_status = conductor.monitor_study() + conductor.cleanup() return completion_status.value else: # Launch manager with nohup log_path = make_safe_path( study.output_path, - *["{}.txt".format(exec_dag.name)]) + *["{}.txt".format(study.name)]) cmd = ["nohup", "conductor", - "-t", str(args.sleeptime), + "-t", str(sleeptime), "-d", str(args.debug_lvl), - path, - "&>", log_path] + study.output_path, + ">", log_path, "2>&1"] LOGGER.debug(" ".join(cmd)) start_process(" ".join(cmd)) @@ -328,7 +331,7 @@ def setup_argparser(): run.add_argument("-s", "--sleeptime", type=int, default=60, help="Amount of time (in seconds) for the manager to " "wait between job status checks. [Default: %(default)d]") - run.add_argument("-d", "--dryrun", action="store_true", default=False, + run.add_argument("--dry", action="store_true", default=False, help="Generate the directory structure and scripts for a " "study but do not launch it. [Default: %(default)s]") run.add_argument("-p", "--pgen", type=str, @@ -399,49 +402,6 @@ def setup_argparser(): return parser -def setup_logging(args, path, name): - """ - Set up logging based on the ArgumentParser. - - :param args: A Namespace object created by a parsed ArgumentParser. - :param path: A default path to be used if a log path is not specified by - user command line arguments. - :param name: The name of the log file. - """ - # If the user has specified a path, use that. - if args.logpath: - logpath = args.logpath - # Otherwise, we should just output to the OUTPUT_PATH. - else: - logpath = make_safe_path(path, *["logs"]) - - loglevel = args.debug_lvl * 10 - - # Create the FileHandler and add it to the logger. - create_parentdir(logpath) - formatter = logging.Formatter(LFORMAT) - ROOTLOGGER.setLevel(loglevel) - - log_path = make_safe_path(logpath, *["{}.log".format(name)]) - fh = logging.FileHandler(log_path) - fh.setLevel(loglevel) - fh.setFormatter(formatter) - ROOTLOGGER.addHandler(fh) - - if args.logstdout: - # Add the StreamHandler - sh = logging.StreamHandler() - sh.setLevel(loglevel) - sh.setFormatter(formatter) - ROOTLOGGER.addHandler(sh) - - # Print the level of logging. - LOGGER.info("INFO Logging Level -- Enabled") - LOGGER.warning("WARNING Logging Level -- Enabled") - LOGGER.critical("CRITICAL Logging Level -- Enabled") - LOGGER.debug("DEBUG Logging Level -- Enabled") - - def main(): """ Execute the main program's functionality. @@ -454,6 +414,19 @@ def main(): parser = setup_argparser() args = parser.parse_args() + # If we have requested to log stdout, set it up to be logged. + if args.logstdout: + if args.debug_lvl == 1: + lformat = DEBUG_FORMAT + else: + lformat = LFORMAT + LOG_UTIL.configure(lformat, args.debug_lvl) + + LOGGER.info("INFO Logging Level -- Enabled") + LOGGER.warning("WARNING Logging Level -- Enabled") + LOGGER.critical("CRITICAL Logging Level -- Enabled") + LOGGER.debug("DEBUG Logging Level -- Enabled") + rc = args.func(args) sys.exit(rc) diff --git a/maestrowf/utils.py b/maestrowf/utils.py index d7404faec..cd7322282 100644 --- a/maestrowf/utils.py +++ b/maestrowf/utils.py @@ -30,6 +30,7 @@ """A collection of more general utility functions.""" from collections import OrderedDict +import coloredlogs import logging import os import string @@ -37,13 +38,14 @@ from six.moves.urllib.request import urlopen from six.moves.urllib.error import HTTPError, URLError import time +import datetime LOGGER = logging.getLogger(__name__) def get_duration(time_delta): """ - Covert durations to HH:MM:SS format. + Convert durations to HH:MM:SS format. :params time_delta: A time difference in datatime format. :returns: A formatted string in HH:MM:SS @@ -58,6 +60,24 @@ def get_duration(time_delta): .format(days, hours, minutes, seconds) +def round_datetime_seconds(input_datetime): + """ + Round datetime to the nearest whole second. + + Solution referenced from: https://stackoverflow.com/questions/47792242/ + rounding-time-off-to-the-nearest-second-python. + + :params input_datetime: A datetime in datatime format. + :returns: ``input_datetime`` rounded to the nearest whole second + """ + new_datetime = input_datetime + + if new_datetime.microsecond >= 500000: + new_datetime = new_datetime + datetime.timedelta(seconds=1) + + return new_datetime.replace(microsecond=0) + + def generate_filename(path, append_time=True): """ Generate a non-conflicting file name. @@ -258,3 +278,74 @@ def create_dictionary(list_keyvalues, token=":"): raise ValueError(msg) return _dict + + +class LoggerUtility: + """Utility class for setting up logging consistently.""" + + def __init__(self, logger): + """ + Initialize a new LoggerUtility class instance. + + :param logger: An instance of a logger to configure. + """ + self._logger = logger + + def configure(self, log_format, log_lvl=2, colors=True): + """ + Configures the general logging facility. + + :param log_format: String containing the desired logging format. + :param log_lvl: Integer level (1-5) to set the logger to. + """ + logging.basicConfig(level=self.map_level(log_lvl), format=log_format) + if colors: + coloredlogs.install(level=self.map_level(log_lvl), + logger=self._logger, fmt=log_format) + + def add_stream_handler(self, log_format, log_lvl=2): + """ + Add a stream handler to logging. + + :param log_format: String containing the desired logging format. + :param log_lvl: Integer level (1-5) to set the logger to. + """ + # Create the FileHandler and add it to the logger. + sh = logging.StreamHandler() + sh.setLevel(self.map_level(log_lvl)) + sh.setFormatter(logging.Formatter(log_format)) + self._logger.addHandler(sh) + + def add_file_handler(self, log_path, log_format, log_lvl=2): + """ + Add a file handler to logging. + + :param log_path: String containing the file path to store logging. + :param log_format: String containing the desired logging format. + :param log_lvl: Integer level (1-5) to set the logger to. + """ + # Create the FileHandler and add it to the logger. + formatter = logging.Formatter(log_format) + + fh = logging.FileHandler(log_path) + fh.setLevel(self.map_level(log_lvl)) + fh.setFormatter(formatter) + self._logger.addHandler(fh) + + @staticmethod + def map_level(log_lvl): + """ + Map level 1-5 to their respective logging enumerations. + + :param log_lvl: Integer level (1-5) representing logging verbosity. + """ + if log_lvl == 1: + return logging.DEBUG + elif log_lvl == 2: + return logging.INFO + elif log_lvl == 3: + return logging.WARNING + elif log_lvl == 4: + return logging.ERROR + else: + return logging.CRITICAL diff --git a/requirements.txt b/requirements.txt index abaebe2d1..d3c724d24 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,14 +1,22 @@ -fabric -coverage -filelock PyYAML>=4.2b1 six +filelock +tabulate +enum34; python_version<'3.4' +dill +jsonschema>=3.2.0 +coloredlogs +chainmap; python_version<'3' +-e . + +fabric +coverage sphinx_rtd_theme sphinx flake8 pydocstyle pylint tox -tabulate pytest -enum34; python_version<'3.4' +pytest-cov +pre-commit diff --git a/samples/lulesh/lulesh_sample1_macosx.yaml b/samples/lulesh/lulesh_sample1_macosx.yaml index 8a840ee64..4dfa9612e 100644 --- a/samples/lulesh/lulesh_sample1_macosx.yaml +++ b/samples/lulesh/lulesh_sample1_macosx.yaml @@ -4,6 +4,12 @@ description: env: variables: + # DEFAULTS FOR MONTECARLO PGEN EXAMPLE + SMIN: 10 + SMAX: 30 + TRIALS: 50 + ITER: 100 + OUTPUT_PATH: ./sample_output/lulesh labels: diff --git a/samples/lulesh/lulesh_sample1_unix.yaml b/samples/lulesh/lulesh_sample1_unix.yaml index e5d98ede7..247bebc5b 100644 --- a/samples/lulesh/lulesh_sample1_unix.yaml +++ b/samples/lulesh/lulesh_sample1_unix.yaml @@ -4,6 +4,12 @@ description: env: variables: + # DEFAULTS FOR MONTECARLO PGEN EXAMPLE + SMIN: 10 + SMAX: 30 + TRIALS: 50 + ITER: 100 + OUTPUT_PATH: ./sample_output/lulesh labels: diff --git a/samples/lulesh/lulesh_sample1_unix_slurm.yaml b/samples/lulesh/lulesh_sample1_unix_slurm.yaml index 1bb921b60..c9756e4ea 100644 --- a/samples/lulesh/lulesh_sample1_unix_slurm.yaml +++ b/samples/lulesh/lulesh_sample1_unix_slurm.yaml @@ -21,6 +21,7 @@ batch: bank : baasic queue : pbatch gres : ignore + reservation : test_reservation study: - name: make-lulesh @@ -43,6 +44,7 @@ study: depends: [make-lulesh] nodes: 2 procs: 27 + exclusive : True walltime: "00:10:00" global.parameters: diff --git a/samples/parameterization/custom_generator.py b/samples/parameterization/custom_generator.py index 820f0847d..1b504c791 100644 --- a/samples/parameterization/custom_generator.py +++ b/samples/parameterization/custom_generator.py @@ -3,7 +3,7 @@ from maestrowf.datastructures.core import ParameterGenerator -def get_custom_generator(): +def get_custom_generator(env, **kwargs): """ Create a custom populated ParameterGenerator. diff --git a/samples/parameterization/lulesh_custom_gen.py b/samples/parameterization/lulesh_custom_gen.py index b88d17729..3dbb95352 100644 --- a/samples/parameterization/lulesh_custom_gen.py +++ b/samples/parameterization/lulesh_custom_gen.py @@ -3,7 +3,7 @@ from maestrowf.datastructures.core import ParameterGenerator -def get_custom_generator(): +def get_custom_generator(env, **kwargs): """ Create a custom populated ParameterGenerator. diff --git a/samples/parameterization/lulesh_montecarlo_args.py b/samples/parameterization/lulesh_montecarlo_args.py index c035cafae..a46871679 100644 --- a/samples/parameterization/lulesh_montecarlo_args.py +++ b/samples/parameterization/lulesh_montecarlo_args.py @@ -5,7 +5,7 @@ from maestrowf.datastructures.core import ParameterGenerator -def get_custom_generator(**kwargs): +def get_custom_generator(env, **kwargs): """ Create a custom populated ParameterGenerator. @@ -17,10 +17,10 @@ def get_custom_generator(**kwargs): :returns: A ParameterGenerator populated with parameters. """ p_gen = ParameterGenerator() - trials = int(kwargs.get("trials")) - size_min = int(kwargs.get("smin")) - size_max = int(kwargs.get("smax")) - iterations = int(kwargs.get("iter")) + trials = int(kwargs.get("trials", env.find("TRIALS").value)) + size_min = int(kwargs.get("smin", env.find("SMIN").value)) + size_max = int(kwargs.get("smax", env.find("SMAX").value)) + iterations = int(kwargs.get("iter", env.find("ITER").value)) params = { "TRIAL": { "values": [i for i in range(1, trials)], diff --git a/setup.py b/setup.py index d4099b7a3..a9ff08517 100644 --- a/setup.py +++ b/setup.py @@ -1,37 +1,54 @@ from maestrowf import __version__ from setuptools import setup, find_packages -setup(name='maestrowf', - description='A tool and library for specifying and conducting general ' - 'workflows.', - version=__version__, - author='Francesco Di Natale', - author_email='dinatale3@llnl.gov', - url='https://github.com/llnl/maestrowf', - license='MIT License', - packages=find_packages(), - entry_points={ - 'console_scripts': [ - 'maestro = maestrowf.maestro:main', - 'conductor = maestrowf.conductor:main', - ] - }, - install_requires=[ - 'PyYAML>=4.2b1', - 'six', - "filelock", - "tabulate", - "enum34 ; python_version<'3.4'" - ], - extras_require={}, - classifiers=[ - 'Development Status :: 3 - Alpha', - 'Intended Audience :: Developers', - 'Operating System :: Unix', - 'Programming Language :: Python', - 'Programming Language :: Python :: 3.4', - 'Programming Language :: Python :: 3.5', - 'Programming Language :: Python :: 3.6', - 'Programming Language :: Python :: 3.7', - ], - ) +setup( + name='maestrowf', + description='A tool to easily orchestrate general computational workflows ' + 'both locally and on supercomputers.', + version=__version__, + author='Francesco Di Natale', + maintainer='Francesco Di Natale', + author_email='dinatale3@llnl.gov', + url='https://github.com/llnl/maestrowf', + license='MIT License', + packages=find_packages(), + entry_points={ + 'console_scripts': [ + 'maestro = maestrowf.maestro:main', + 'conductor = maestrowf.conductor:main', + ] + }, + install_requires=[ + 'PyYAML>=4.2b1', + 'six', + "filelock", + "tabulate", + "enum34 ; python_version<'3.4'", + "dill", + "jsonschema>=3.2.0", + "coloredlogs", + "chainmap ; python_version<'3'", + ], + extras_require={}, + long_description_content_type='text/markdown', + download_url='https://pypi.org/project/maestrowf/', + python_requires='>=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*, !=3.4.*', + classifiers=[ + 'Development Status :: 5 - Production/Stable', + 'Operating System :: Unix', + 'Operating System :: MacOS :: MacOS X', + 'Intended Audience :: Developers', + 'Intended Audience :: Education', + 'Intended Audience :: Science/Research', + 'Topic :: Scientific/Engineering', + 'Topic :: System :: Distributed Computing', + 'Programming Language :: Python :: 3', + 'Programming Language :: Python :: 3.5', + 'Programming Language :: Python :: 3.6', + 'Programming Language :: Python :: 3.7', + ], + package_data={ + 'maestrowf': ['maestrowf/datastructures/schemas.json'], + }, + include_package_data=True, +) diff --git a/tox.ini b/tox.ini index 9c5536a4d..8f818c857 100644 --- a/tox.ini +++ b/tox.ini @@ -10,7 +10,6 @@ skip_missing_interpreters=True [travis] python = 2.7: py27 - 3.4: py34 3.5: py35 3.6: py36