diff --git a/docs/conf.py b/docs/conf.py index 28f99c7..401939c 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -26,7 +26,7 @@ # The short X.Y version version = '0.1' # The full version, including alpha/beta/rc tags -release = '0.1.6' +release = '0.1.7' # -- General configuration --------------------------------------------------- diff --git a/requirements.txt b/requirements.txt index 6cbcf0c..cac17b4 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,8 +1,8 @@ -ipython==6.4.0 -numpy==1.14.3 -pydot_ng==1.0.0 -pytest==3.6.0 -scikit_learn==0.19.1 -scipy==1.1.0 -setuptools==39.2.0 -typing==3.6.4 +ipython>=6.4.0 +numpy>=1.14.0 +pydot_ng>=1.0.0 +pytest>=3.6.0 +scikit_learn>=0.19.0 +scipy>=1.0.0 +setuptools>=39.2.0 +typing>=3.6.4 diff --git a/setup.py b/setup.py index 956875d..76ce01f 100644 --- a/setup.py +++ b/setup.py @@ -13,11 +13,11 @@ setup(name='steppy', packages=['steppy'], - version='0.1.6', + version='0.1.7', description='A lightweight, open-source, Python library for fast and reproducible experimentation', long_description=long_description, url='https://github.com/minerva-ml/steppy', - download_url='https://github.com/minerva-ml/steppy/archive/0.1.6.tar.gz', + download_url='https://github.com/minerva-ml/steppy/archive/0.1.7.tar.gz', author='Kamil A. Kaczmarek, Jakub Czakon', author_email='kamil.kaczmarek@neptune.ml, jakub.czakon@neptune.ml', keywords=['machine-learning', 'reproducibility', 'pipeline', 'data-science'], diff --git a/steppy/base.py b/steppy/base.py index 21eb8f5..a6ffa8f 100644 --- a/steppy/base.py +++ b/steppy/base.py @@ -1,4 +1,3 @@ -import glob import os import pprint import shutil @@ -28,24 +27,25 @@ class Step: (see: :func:`~steppy.utils.persist_as_png`) or return step in a jupyter notebook cell. Attributes: - name (str): Step name. - Each step in a pipeline must have a unique name. This names is used to persist or cache - transformers and outputs of this Step. - transformer (obj): object that inherits from BaseTransformer or Step instance. When Step instance is passed, transformer from that Step will be copied and used to perform transformations. It is useful when both train and valid data are passed in one pipeline (common situation in deep learning). + name (str): Step name. + Each step in a pipeline must have a unique name. It is name of the persisted + transformers and outputs of this Step. + Default is transformer's class name. + experiment_directory (str): path to the directory where all execution artifacts will be - stored. The following sub-directories will be created, if they were not created by + stored. + Default is ``~/.steppy``. + The following sub-directories will be created, if they were not created by other Steps: * transformers: transformer objects are persisted in this folder * outputs: step output dictionaries are persisted in this folder (if ``persist_output=True``) - * cache: step output dictionaries are cached in this folder - (if ``cache_output=True``). input_data (list): Elements of this list are keys in the data dictionary that is passed to the Step's `fit_transform` and `transform` methods. @@ -111,15 +111,14 @@ class Step: where both `images` and `labels` keys comes from `input` (see :attr:`~steppy.base.Step.input_data`) - cache_output (bool): If True, Step output dictionary will be cached to the - ``/cache/``, when transform method of the Step transformer + cache_output (bool): If True, Step output dictionary will be cached under + ``self.output``, when transform method of the Step transformer is completed. If the same Step is used multiple times, transform method is invoked - only once. Further invokes simply load output from the - ``/cache/`` directory. + only once. Further invokes simply use cached output. Default ``False``: do not cache outputs Warning: - One should always run `pipeline.clean_cache()` before executing + One should always run `pipeline.clean_cache_pipeline()` before executing `pipeline.fit_transform(data)` or `pipeline.transform(data)` When working with large datasets, cache might be very large. @@ -136,7 +135,7 @@ class Step: When working with large datasets, cache might be very large. load_persisted_output (bool): If True, Step output dictionary already persisted to the - ``/cache/`` will be loaded when Step is called. + ``/output/`` will be loaded when Step is called. Default ``False``: do not load persisted output. Useful when debugging and working with ensemble models or time consuming feature extraction. One can easily persist already computed pieces of the pipeline and save @@ -162,22 +161,29 @@ class Step: """ def __init__(self, - name, transformer, - experiment_directory, + name=None, + experiment_directory=None, input_data=None, input_steps=None, adapter=None, - is_trainable=False, + + is_fittable=True, + force_fitting=True, + persist_output=True, + cache_output=False, - persist_output=False, load_persisted_output=False, - force_fitting=False, persist_upstream_pipeline_structure=False): - assert isinstance(name, str), 'Step name must be str, got {} instead.'.format(type(name)) - assert isinstance(experiment_directory, str), 'Step {} error, experiment_directory must ' \ - 'be str, got {} instead.'.format(name, type(experiment_directory)) + name = self._format_step_name(name, transformer) + + if experiment_directory is not None: + assert isinstance(experiment_directory, str),\ + 'Step {} error, experiment_directory must ' \ + 'be str, got {} instead.'.format(name, type(experiment_directory)) + else: + experiment_directory = os.path.join(os.path.expanduser("~"), '.steppy') if input_data is not None: assert isinstance(input_data, list), 'Step {} error, input_data must be list, ' \ @@ -193,31 +199,30 @@ def __init__(self, 'got {} instead.'.format(name, type(cache_output)) assert isinstance(persist_output, bool), 'Step {} error, persist_output must be bool, ' \ 'got {} instead.'.format(name, type(persist_output)) - assert isinstance(load_persisted_output, bool), 'Step {} error, load_persisted_output ' \ - 'must be bool, got {} instead.'.format(name, type( - load_persisted_output)) + assert isinstance(load_persisted_output, bool),\ + 'Step {} error, load_persisted_output ' \ + 'must be bool, got {} instead.'.format(name, type(load_persisted_output)) assert isinstance(force_fitting, bool), 'Step {} error, force_fitting must be bool, ' \ 'got {} instead.'.format(name, type(force_fitting)) - assert isinstance(persist_upstream_pipeline_structure, bool), 'Step {} error, ' \ - 'persist_upstream_pipeline_structure must be bool, got {} instead.' \ + assert isinstance(persist_upstream_pipeline_structure, bool),\ + 'Step {} error, persist_upstream_pipeline_structure must be bool, got {} instead.' \ .format(name, type(persist_upstream_pipeline_structure)) - logger.info('initializing Step {}...'.format(name)) + logger.info('Initializing Step {}'.format(name)) self.name = name self.transformer = transformer - self.input_steps = input_steps or [] self.input_data = input_data or [] self.adapter = adapter - - self.is_trainable = is_trainable + self.is_fittable = is_fittable self.cache_output = cache_output self.persist_output = persist_output self.load_persisted_output = load_persisted_output self.force_fitting = force_fitting self.exp_dir = os.path.join(experiment_directory) + self.output = None self._prepare_experiment_directories() if persist_upstream_pipeline_structure: @@ -257,7 +262,7 @@ def all_steps(self): return all_steps @property - def transformer_is_cached(self): + def transformer_is_persisted(self): """(bool): True if transformer exists under the directory ``/transformers/`` """ @@ -267,10 +272,13 @@ def transformer_is_cached(self): @property def output_is_cached(self): - """(bool): True if step outputs exists under the ``/cache/``. + """(bool): True if step outputs exists under the ``self.output``. See `cache_output`. """ - return os.path.exists(self.exp_dir_cache_step) + if self.output is not None: + return True + else: + return False @property def output_is_persisted(self): @@ -302,10 +310,10 @@ def fit_transform(self, data): dict: Step outputs from the ``self.transformer.fit_transform`` method """ if self.output_is_cached and not self.force_fitting: - logger.info('Step {} loading cached output...'.format(self.name)) - step_output_data = self._load_output(self.exp_dir_cache_step) + logger.info('Step {} using cached output'.format(self.name)) + step_output_data = self.output elif self.output_is_persisted and self.load_persisted_output and not self.force_fitting: - logger.info('Step {} loading persisted output...'.format(self.name)) + logger.info('Step {} loading persisted output from {}'.format(self.name, self.exp_dir_outputs_step)) step_output_data = self._load_output(self.exp_dir_outputs_step) else: step_inputs = {} @@ -320,7 +328,7 @@ def fit_transform(self, data): step_inputs = self._adapt(step_inputs) else: step_inputs = self._unpack(step_inputs) - step_output_data = self._cached_fit_transform(step_inputs) + step_output_data = self._fit_transform_operation(step_inputs) return step_output_data def transform(self, data): @@ -349,10 +357,10 @@ def transform(self, data): dict: step outputs from the transformer.transform method """ if self.output_is_cached: - logger.info('Step {} loading cached output...'.format(self.name)) - step_output_data = self._load_output(self.exp_dir_cache_step) + logger.info('Step {} using cached output'.format(self.name)) + step_output_data = self.output elif self.output_is_persisted and self.load_persisted_output: - logger.info('Step {} loading persisted output...'.format(self.name)) + logger.info('Step {} loading persisted output from {}'.format(self.name, self.exp_dir_outputs_step)) step_output_data = self._load_output(self.exp_dir_outputs_step) else: step_inputs = {} @@ -367,18 +375,22 @@ def transform(self, data): step_inputs = self._adapt(step_inputs) else: step_inputs = self._unpack(step_inputs) - step_output_data = self._cached_transform(step_inputs) + step_output_data = self._transform_operation(step_inputs) return step_output_data - def clean_cache(self): - """Removes everything from the directory ``/cache``. + def clean_cache_step(self): + """Clean cache for current step. """ - logger.info('cleaning cache...') - paths = glob.glob(os.path.join(self.exp_dir_cache, '*')) - for path in paths: - logger.info('removing {}'.format(path)) - os.remove(path) - logger.info('cleaning cache done') + logger.info('Step {}, cleaning cache'.format(self.name)) + self.output = None + + def clean_cache_upstream_steps(self): + """Clean cache for all steps that are upstream to `self`. + """ + logger.info('Cleaning cache for the entire upstream pipeline') + for step in self.all_steps.values(): + logger.info('Step {}, cleaning cache'.format(step.name)) + step.output = None def get_step(self, name): """Extracts step by name from the pipeline. @@ -406,91 +418,70 @@ def persist_pipeline_diagram(self, filepath): ' instead'.format(self.name, type(filepath)) persist_as_png(self.upstream_pipeline_structure, filepath) - def _copy_transformer(self, step, name, dirpath): - self.transformer = self.transformer.transformer - - original_filepath = os.path.join(step.exp_dir, 'transformers', step.name) - copy_filepath = os.path.join(dirpath, 'transformers', name) - logger.info('copying transformer from {} to {}'.format(original_filepath, copy_filepath)) - shutil.copyfile(original_filepath, copy_filepath) - - def _prepare_experiment_directories(self): - logger.info('initializing experiment directories under {}'.format(self.exp_dir)) - - for dir_name in ['transformers', 'outputs', 'cache']: - os.makedirs(os.path.join(self.exp_dir, dir_name), exist_ok=True) - - self.exp_dir_transformers = os.path.join(self.exp_dir, 'transformers') - self.exp_dir_outputs = os.path.join(self.exp_dir, 'outputs') - self.exp_dir_cache = os.path.join(self.exp_dir, 'cache') - - self.exp_dir_transformers_step = os.path.join(self.exp_dir_transformers, self.name) - self.exp_dir_outputs_step = os.path.join(self.exp_dir_outputs, '{}'.format(self.name)) - self.exp_dir_cache_step = os.path.join(self.exp_dir_cache, '{}'.format(self.name)) - - logger.info('done: initializing experiment directories') - - def _cached_fit_transform(self, step_inputs): - if self.is_trainable: - if self.transformer_is_cached and not self.force_fitting: + def _fit_transform_operation(self, step_inputs): + if self.is_fittable: + if self.transformer_is_persisted and not self.force_fitting: logger.info('Step {}, loading transformer from the {}' .format(self.name, self.exp_dir_transformers_step)) self.transformer.load(self.exp_dir_transformers_step) logger.info('Step {}, transforming...'.format(self.name)) step_output_data = self.transformer.transform(**step_inputs) + logger.info('Step {}, transforming completed'.format(self.name)) else: logger.info('Step {}, fitting and transforming...'.format(self.name)) step_output_data = self.transformer.fit_transform(**step_inputs) + logger.info('Step {}, fitting and transforming completed'.format(self.name)) logger.info('Step {}, persisting transformer to the {}' .format(self.name, self.exp_dir_transformers_step)) self.transformer.persist(self.exp_dir_transformers_step) else: - logger.info('Step {}, transforming...'.format(self.name)) + logger.info('Step {}, is not fittable, transforming...'.format(self.name)) step_output_data = self.transformer.transform(**step_inputs) - + logger.info('Step {}, transforming completed'.format(self.name)) if self.cache_output: - logger.info('Step {}, caching output to the {}' - .format(self.name, self.exp_dir_cache_step)) - self._persist_output(step_output_data, self.exp_dir_cache_step) + logger.info('Step {}, caching output'.format(self.name)) + self.output = step_output_data if self.persist_output: logger.info('Step {}, persisting output to the {}' .format(self.name, self.exp_dir_outputs_step)) self._persist_output(step_output_data, self.exp_dir_outputs_step) return step_output_data - def _load_output(self, filepath): - logger.info('Step {}, loading from {}'.format(self.name, filepath)) - return joblib.load(filepath) - - def _persist_output(self, output_data, filepath): - joblib.dump(output_data, filepath) - - def _cached_transform(self, step_inputs): - if self.is_trainable: - if self.transformer_is_cached: + def _transform_operation(self, step_inputs): + if self.is_fittable: + if self.transformer_is_persisted: logger.info('Step {}, loading transformer from the {}' .format(self.name, self.exp_dir_transformers_step)) self.transformer.load(self.exp_dir_transformers_step) logger.info('Step {}, transforming...'.format(self.name)) step_output_data = self.transformer.transform(**step_inputs) + logger.info('Step {}, transforming completed'.format(self.name)) else: - raise ValueError('No transformer cached {}'.format(self.name)) + raise ValueError('No transformer persisted with name: {}' + 'Make sure that you have proper transformer under the directory: {}' + .format(self.name, self.exp_dir_transformers)) else: - logger.info('Step {}, transforming...'.format(self.name)) + logger.info('Step {}, is not fittable, transforming...'.format(self.name)) step_output_data = self.transformer.transform(**step_inputs) - + logger.info('Step {}, transforming completed'.format(self.name)) if self.cache_output: - logger.info('Step {}, caching output to the {}' - .format(self.name, self.exp_dir_cache_step)) - self._persist_output(step_output_data, self.exp_dir_cache_step) + logger.info('Step {}, caching output'.format(self.name)) + self.output = step_output_data if self.persist_output: logger.info('Step {}, persisting output to the {}' .format(self.name, self.exp_dir_outputs_step)) self._persist_output(step_output_data, self.exp_dir_outputs_step) return step_output_data + def _load_output(self, filepath): + logger.info('Step {}, loading output from {}'.format(self.name, filepath)) + return joblib.load(filepath) + + def _persist_output(self, output_data, filepath): + joblib.dump(output_data, filepath) + def _adapt(self, step_inputs): - logger.info('Step {}, adapting inputs...'.format(self.name)) + logger.info('Step {}, adapting inputs'.format(self.name)) try: return self.adapter.adapt(step_inputs) except AdapterError as e: @@ -498,7 +489,7 @@ def _adapt(self, step_inputs): raise StepsError(msg) from e def _unpack(self, step_inputs): - logger.info('Step {}, unpacking inputs...'.format(self.name)) + logger.info('Step {}, unpacking inputs'.format(self.name)) unpacked_steps = {} key_to_step_names = defaultdict(list) for step_name, step_dict in step_inputs.items(): @@ -516,12 +507,58 @@ def _unpack(self, step_inputs): for key, step_names in repeated_keys]) raise StepsError(msg) + def _copy_transformer(self, step, name, dirpath): + self.transformer = self.transformer.transformer + + original_filepath = os.path.join(step.exp_dir, 'transformers', step.name) + copy_filepath = os.path.join(dirpath, 'transformers', name) + logger.info('copying transformer from {} to {}'.format(original_filepath, copy_filepath)) + shutil.copyfile(original_filepath, copy_filepath) + + def _prepare_experiment_directories(self): + if not os.path.exists(os.path.join(self.exp_dir, 'outputs')): + logger.info('initializing experiment directories under {}'.format(self.exp_dir)) + for dir_name in ['transformers', 'outputs']: + os.makedirs(os.path.join(self.exp_dir, dir_name), exist_ok=True) + + self.exp_dir_transformers = os.path.join(self.exp_dir, 'transformers') + self.exp_dir_outputs = os.path.join(self.exp_dir, 'outputs') + + self.exp_dir_transformers_step = os.path.join(self.exp_dir_transformers, self.name) + self.exp_dir_outputs_step = os.path.join(self.exp_dir_outputs, self.name) + def _get_steps(self, all_steps): for input_step in self.input_steps: all_steps = input_step._get_steps(all_steps) all_steps[self.name] = self return all_steps + def _format_step_name(self, name, transformer): + self._validate_step_name(name=name) + if name is not None: + name = str(name) + else: + name = transformer.__class__.__name__ + return '{}{}'.format(name, self._get_step_suffix(name)) + + def _validate_step_name(self, name): + if name is not None: + assert isinstance(name, str) or isinstance(name, float) or isinstance(name, int),\ + 'Step name must be str, float or int. Got {} instead.'.format(type(name)) + + def _get_step_suffix(self, name): + """returns suffix '_k' + Where 'k' is int that denotes highest increment of step with the same name. + """ + highest_id = 0 + for key in self.all_steps.keys(): + key_id = key.split('_')[-1] + key_stripped = key[:-len(key_id) - 1] + if key_stripped == name: + if key_id > highest_id: + highest_id += 1 + return '_{}'.format(highest_id) + def _build_structure_dict(self, structure_dict): for input_step in self.input_steps: structure_dict = input_step._build_structure_dict(structure_dict) @@ -540,7 +577,7 @@ def __str__(self): class BaseTransformer: - """Abstraction on two level fit and transform execution. + """Abstraction on ``fit`` and ``transform`` execution. Base transformer is an abstraction strongly inspired by the ``sklearn.Transformer`` and ``sklearn.Estimator``. Two main concepts are: @@ -627,7 +664,7 @@ def persist(self, filepath): Args: filepath (str): filepath where the transformer parameters should be persisted """ - joblib.dump({}, filepath) + raise NotImplementedError class IdentityOperation(BaseTransformer): @@ -637,6 +674,10 @@ class IdentityOperation(BaseTransformer): def transform(self, **kwargs): return kwargs + def persist(self, filepath): + logger.info('"IdentityOperation" is not persistable') + pass + class StepsError(Exception): pass @@ -644,7 +685,13 @@ class StepsError(Exception): def make_transformer(func): class StaticTransformer(BaseTransformer): + def persist(self, filepath): + logger.info('StaticTransformer is not persistable.' + 'By running "fit_transform()", you simply "transform()".') + def transform(self, *args, **kwargs): return func(*args, **kwargs) - return StaticTransformer() + _transformer = StaticTransformer() + _transformer.__class__.__name__ = func.__name__ + return _transformer