diff --git a/.github/workflows/python-app.yml b/.github/workflows/python-app.yml index c8aa868..a423339 100644 --- a/.github/workflows/python-app.yml +++ b/.github/workflows/python-app.yml @@ -13,7 +13,7 @@ jobs: runs-on: ubuntu-latest strategy: matrix: - python-version: [2.7, 3.5, 3.6, 3.7, 3.8] + python-version: [3.5, 3.6, 3.7, 3.8] steps: - uses: actions/checkout@v2 diff --git a/.gitignore b/.gitignore index 064594c..04c1a5b 100644 --- a/.gitignore +++ b/.gitignore @@ -7,3 +7,5 @@ coverage.xml .tox .*.sw[op] .idea/ +.eggs +build diff --git a/boundary_layer/builders/primary.py b/boundary_layer/builders/primary.py index 91d2876..a17b2fc 100644 --- a/boundary_layer/builders/primary.py +++ b/boundary_layer/builders/primary.py @@ -14,6 +14,7 @@ # limitations under the License. import datetime +from marshmallow import ValidationError import boundary_layer from boundary_layer.builders.base import DagBuilderBase from boundary_layer.schemas.dag import DagArgsSchema @@ -28,16 +29,15 @@ def preamble(self): self.reference_path) template = self.get_jinja_template('primary_preamble.j2') - - dag_args_dumped = DagArgsSchema(context={'for_dag_output': True}).dump( - self.dag.get('dag_args', {})) - if dag_args_dumped.errors: + try: + dag_args = DagArgsSchema(context={'for_dag_output': True}).dump( + self.dag.get('dag_args', {})) + except ValidationError as err: # should not happen because the schema was validated upon load, # but we should check raise Exception('Error serializing dag_args: {}'.format( - dag_args_dumped.errors)) + err.messages)) - dag_args = dag_args_dumped.data dag_args['dag_id'] = self.build_dag_id() default_task_args = self.dag.get('default_task_args', {}) diff --git a/boundary_layer/oozier/actions.py b/boundary_layer/oozier/actions.py index 87c3661..ff22e6c 100644 --- a/boundary_layer/oozier/actions.py +++ b/boundary_layer/oozier/actions.py @@ -53,11 +53,9 @@ def schema(self): pass def __init__(self, context, action_metadata, data): - loaded = self.schema(context=context).load(data) - if loaded.errors: - raise ma.ValidationError(loaded.errors) + loaded_data = self.schema(context=context).load(data) - super(OozieActionBuilderWithSchema, self).__init__(context, action_metadata, loaded.data) + super(OozieActionBuilderWithSchema, self).__init__(context, action_metadata, loaded_data) def get_action(self): result = self.action_metadata.copy() @@ -67,7 +65,7 @@ def get_action(self): class OozieSubWorkflowActionSchema(OozieBaseSchema): - app_path = ma.fields.String(required=True, load_from='app-path') + app_path = ma.fields.String(required=True, data_key='app-path') propagate_configuration = ma.fields.Dict(allow_none=True) diff --git a/boundary_layer/oozier/parse.py b/boundary_layer/oozier/parse.py index 9891bb5..e101dbc 100644 --- a/boundary_layer/oozier/parse.py +++ b/boundary_layer/oozier/parse.py @@ -18,6 +18,7 @@ import networkx as nx import six +from marshmallow import ValidationError from boundary_layer.graph import _GraphUtil from boundary_layer.logger import logger from boundary_layer import exceptions, VERSION_STRING, plugins @@ -120,20 +121,20 @@ def _parse_workflow(self, filename, cluster_config, oozie_config): parsed = xmltodict.parse( self.file_fetcher.fetch_file_content(filename)) - loaded = OozieWorkflowSchema(context={ - 'cluster_config': cluster_config, - 'oozie_plugin': oozie_config, - 'macro_translator': JspMacroTranslator(oozie_config.jsp_macros()), - 'production': self.production, - }).load(parsed) - - if loaded.errors: + try: + data = OozieWorkflowSchema(context={ + 'cluster_config': cluster_config, + 'oozie_plugin': oozie_config, + 'macro_translator': JspMacroTranslator(oozie_config.jsp_macros()), + 'production': self.production, + }).load(parsed) + except ValidationError as err: raise Exception('Errors parsing file {}: {}'.format( filename, - loaded.errors)) + err.messages)) - data_copy = loaded.data.copy() - data_copy.update(self.partition_actions(loaded.data)) + data_copy = data.copy() + data_copy.update(self.partition_actions(data)) return data_copy diff --git a/boundary_layer/oozier/schema.py b/boundary_layer/oozier/schema.py index f06fdeb..6abad7d 100644 --- a/boundary_layer/oozier/schema.py +++ b/boundary_layer/oozier/schema.py @@ -17,10 +17,13 @@ class OozieBaseSchema(ma.Schema): + class Meta: + unknown = ma.INCLUDE + singletons_to_lists = [] @ma.pre_load - def _convert_singletons_to_lists(self, data): + def _convert_singletons_to_lists(self, data, **kwargs): if all(isinstance(data.get(key, []), list) for key in self.singletons_to_lists): return data @@ -34,15 +37,15 @@ def _convert_singletons_to_lists(self, data): class OozieNamedObjectSchema(OozieBaseSchema): - name = ma.fields.String(required=True, load_from='@name') + name = ma.fields.String(required=True, data_key='@name') class OozieFlowControlSchema(OozieBaseSchema): - to = ma.fields.String(required=True, load_from='@to') + to = ma.fields.String(required=True, data_key='@to') class OozieForkStartSchema(OozieBaseSchema): - start = ma.fields.String(required=True, load_from='@start') + start = ma.fields.String(required=True, data_key='@start') class OozieForkSchema(OozieNamedObjectSchema): @@ -51,7 +54,7 @@ class OozieForkSchema(OozieNamedObjectSchema): singletons_to_lists = ['path'] @ma.post_load - def add_base_operator(self, data): + def add_base_operator(self, data, **kwargs): operator = { 'name': data['name'], 'type': 'flow_control', @@ -64,7 +67,7 @@ def add_base_operator(self, data): class OozieJoinSchema(OozieNamedObjectSchema, OozieFlowControlSchema): @ma.post_load - def add_base_operator(self, data): + def add_base_operator(self, data, **kwargs): operator = { 'name': data['name'], 'type': 'flow_control', @@ -101,7 +104,7 @@ def _get_action_builder(self, data): return keyed_action_builders[keys_present[0]] @ma.validates_schema(pass_original=True) - def one_action_type(self, _, original): + def one_action_type(self, _, original, **kwargs): """ Runs the validation checks to make sure that exactly one known action is present, and prints an error message based on the content of the original, unparsed input @@ -109,7 +112,7 @@ def one_action_type(self, _, original): self._get_action_builder(original) @ma.post_load(pass_original=True) - def fetch_base_action(self, data, original): + def fetch_base_action(self, data, original, **kwargs): builder_cls = self._get_action_builder(original) return builder_cls(self.context, data, original[builder_cls.key]) @@ -120,7 +123,7 @@ class OozieKillSchema(OozieNamedObjectSchema): class OozieCaseSchema(OozieFlowControlSchema): - text = ma.fields.String(required=True, load_from='#text') + text = ma.fields.String(required=True, data_key='#text') class OozieSwitchSchema(OozieBaseSchema): @@ -135,7 +138,7 @@ class OozieDecisionSchema(OozieNamedObjectSchema): class OozieWorkflowAppSchema(OozieNamedObjectSchema): - name = ma.fields.String(required=True, load_from='@name') + name = ma.fields.String(required=True, data_key='@name') action = ma.fields.List(ma.fields.Nested(OozieActionSchema), missing=[]) join = ma.fields.List(ma.fields.Nested(OozieJoinSchema), missing=[]) fork = ma.fields.List(ma.fields.Nested(OozieForkSchema), missing=[]) @@ -150,8 +153,8 @@ class OozieWorkflowAppSchema(OozieNamedObjectSchema): class OozieWorkflowSchema(OozieBaseSchema): workflow_app = ma.fields.Nested( - OozieWorkflowAppSchema, load_from='workflow-app', required=True) + OozieWorkflowAppSchema, data_key='workflow-app', required=True) @ma.post_load - def return_workflow(self, data): + def return_workflow(self, data, **kwargs): return data['workflow_app'] diff --git a/boundary_layer/plugins/plugin_manager.py b/boundary_layer/plugins/plugin_manager.py index 40aeefe..e090842 100644 --- a/boundary_layer/plugins/plugin_manager.py +++ b/boundary_layer/plugins/plugin_manager.py @@ -64,14 +64,15 @@ def parse_plugin_config(self, plugin, config): 'Config schema for plugin {} is not a marshmallow Schema. ' 'Found: {}'.format(plugin.name, plugin.config_schema_cls)) - parsed_config = plugin.config_schema_cls().load(config or {}) - if parsed_config.errors: + try: + parsed_config_data = plugin.config_schema_cls().load(config or {}) + except ValidationError as err: raise Exception( 'Errors parsing configuration for plugin {}: {}'.format( plugin.name, - parsed_config.errors)) + err.messages)) - return parsed_config.data + return parsed_config_data def insert_imports(self, plugin_config): objects = [] diff --git a/boundary_layer/registry/registry.py b/boundary_layer/registry/registry.py index fba4f7c..f17dfd6 100644 --- a/boundary_layer/registry/registry.py +++ b/boundary_layer/registry/registry.py @@ -17,6 +17,7 @@ import os from enum import Enum import yaml +from marshmallow import ValidationError from boundary_layer.logger import logger from boundary_layer import util @@ -177,13 +178,13 @@ def load_from_file(self, filename): logger.debug('validating item %s against schema %s', item, self.spec_schema_cls.__name__) - - loaded = self.spec_schema_cls().load(item) - if loaded.errors: + try: + data = self.spec_schema_cls().load(item) + except ValidationError as err: raise InvalidConfig('Invalid config spec in file {}: {}'.format( - filename, loaded.errors)) + filename, err.messages)) - return loaded.data + return data def load_configs(self, config_paths): if not isinstance(config_paths, list): diff --git a/boundary_layer/registry/types/operator.py b/boundary_layer/registry/types/operator.py index 85a14e5..8586c88 100644 --- a/boundary_layer/registry/types/operator.py +++ b/boundary_layer/registry/types/operator.py @@ -14,6 +14,7 @@ # limitations under the License. import six +from marshmallow import ValidationError from boundary_layer.logger import logger from boundary_layer.registry import ConfigFileRegistry, RegistryNode, NodeTypes from boundary_layer.schemas.internal.operators import OperatorSpecSchema @@ -79,15 +80,14 @@ def imports(self): 'resolve_properties() has not been called yet!'.format( self)) - loaded = ImportSchema().load(self.config.get('imports', {})) - - assert not loaded.errors, \ - ('Internal error: processing `imports` config {} for ' - 'operator `{}`').format( + try: + result = ImportSchema().load(self.config.get('imports', {})) + except ValidationError as err: + raise Exception( + ('Internal error: processing `imports` config {} for ' + 'operator `{}`').format( self.config.get('imports', {}), - self.name) - - result = loaded.data + self.name)) if self.operator_class: result.setdefault('objects', []) diff --git a/boundary_layer/registry/types/preprocessor.py b/boundary_layer/registry/types/preprocessor.py index 509e41a..41650df 100644 --- a/boundary_layer/registry/types/preprocessor.py +++ b/boundary_layer/registry/types/preprocessor.py @@ -14,6 +14,7 @@ # limitations under the License. import abc +from marshmallow import ValidationError from boundary_layer.registry import Registry, RegistryNode, NodeTypes @@ -43,14 +44,15 @@ def __init__(self, properties): self.properties = None return - loaded = self.properties_schema_cls().load(properties) - if loaded.errors: + try: + data = self.properties_schema_cls().load(properties) + except ValidationError as err: raise Exception( - 'Error parsing properties for preprocessor `{}`: {}'.format( + 'Error parsing properties for preprocessor {}: {}'.format( self.type, - loaded.errors)) + err.messages)) - self.properties = loaded.data + self.properties = data @property def properties_schema_cls(self): diff --git a/boundary_layer/schemas/base.py b/boundary_layer/schemas/base.py index c42e03d..06be7d7 100644 --- a/boundary_layer/schemas/base.py +++ b/boundary_layer/schemas/base.py @@ -22,7 +22,7 @@ class StrictSchema(ma.Schema): @ma.validates_schema(pass_original=True) - def check_no_unknowns(self, _, original_data): + def check_no_unknowns(self, _, original_data, **kwargs): def check_single_datum(datum): unknown = set(datum) - set(self.fields) if unknown: diff --git a/boundary_layer/schemas/dag.py b/boundary_layer/schemas/dag.py index 15b490b..99c1dc1 100644 --- a/boundary_layer/schemas/dag.py +++ b/boundary_layer/schemas/dag.py @@ -42,7 +42,7 @@ class GeneratorSchema(ReferenceSchema): regex_blocklist = fields.List(fields.String()) @validates_schema - def check_task_id_mode(self, data): + def check_task_id_mode(self, data, **kwargs): if 'auto_task_id_mode' not in data: return @@ -133,7 +133,7 @@ class DagArgsSchema(StrictSchema): access_control = fields.Dict() @validates_schema - def validate_callbacks(self, data): + def validate_callbacks(self, data, **kwargs): callbacks = ['sla_miss_callback', 'on_success_callback', 'on_failure_callback'] for cb in callbacks: @@ -145,7 +145,7 @@ def validate_callbacks(self, data): [cb]) @validates_schema - def validate_default_view(self, data): + def validate_default_view(self, data, **kwargs): if 'default_view' not in data: return @@ -157,7 +157,7 @@ def validate_default_view(self, data): ['default_view']) @validates_schema - def validate_orientation(self, data): + def validate_orientation(self, data, **kwargs): if 'orientation' not in data: return @@ -168,7 +168,7 @@ def validate_orientation(self, data): ['orientation']) @validates_schema - def validate_template_undefined(self, data): + def validate_template_undefined(self, data, **kwargs): if 'template_undefined' not in data: return if not re.compile('<<.+>>').match(data['template_undefined']): @@ -177,7 +177,7 @@ def validate_template_undefined(self, data): ['template_undefined']) @post_dump - def dagrun_timeout_to_timedelta(self, data): + def dagrun_timeout_to_timedelta(self, data, **kwargs): if not self.context.get('for_dag_output'): return data if 'dagrun_timeout' in data: @@ -199,7 +199,7 @@ class PrimaryDagSchema(BaseDagSchema): default_task_args = fields.Dict() @validates_schema - def validate_compatibility_version(self, data): + def validate_compatibility_version(self, data, **kwargs): if not data.get('compatibility_version'): return @@ -222,4 +222,4 @@ def validate_compatibility_version(self, data): 'Incompatible boundary_layer version: This workflow ' 'is for the incompatible prior version {}. Use the ' 'migrate-workflow script to update it.'.format(version), - ['compatibility_version']) + ['compatibility_version']) \ No newline at end of file diff --git a/boundary_layer/schemas/internal/base.py b/boundary_layer/schemas/internal/base.py index 5fc2a1f..da3218a 100644 --- a/boundary_layer/schemas/internal/base.py +++ b/boundary_layer/schemas/internal/base.py @@ -33,7 +33,7 @@ class BaseSpecSchema(StrictSchema): schema_extends = ma.fields.String() @ma.validates_schema - def jsonschema_or_extended(self, data): + def jsonschema_or_extended(self, data, **kwargs): if 'parameters_jsonschema' in data or 'schema_extends' in data: return @@ -43,7 +43,7 @@ def jsonschema_or_extended(self, data): data.get('name', data))) @ma.validates_schema - def check_jsonschema(self, data): + def check_jsonschema(self, data, **kwargs): if 'parameters_jsonschema' not in data: return # Make sure that `properties` is present, because it's not actually diff --git a/boundary_layer/schemas/internal/operators.py b/boundary_layer/schemas/internal/operators.py index 12c9669..632cc11 100644 --- a/boundary_layer/schemas/internal/operators.py +++ b/boundary_layer/schemas/internal/operators.py @@ -32,7 +32,7 @@ class OperatorSpecSchema(BaseSpecSchema): property_preprocessors = ma.fields.List(ma.fields.Nested(PropertyPreprocessorSchema)) @ma.validates_schema - def valid_preprocessor_property_names(self, data): + def valid_preprocessor_property_names(self, data, **kwargs): preprocessors = data.get('property_preprocessors', []) if not preprocessors: diff --git a/boundary_layer/workflow.py b/boundary_layer/workflow.py index f51bcb1..b587773 100644 --- a/boundary_layer/workflow.py +++ b/boundary_layer/workflow.py @@ -15,6 +15,7 @@ import os from collections import namedtuple, Counter +from marshmallow import ValidationError import six import yaml @@ -288,14 +289,13 @@ def parse_dag(dag, schema_cls): """ Parse the DAG using the specified schema class. Raise an exception if any errors are detected. """ - loaded = schema_cls().load(dag) - if loaded.errors: + try: + parsed = schema_cls().load(dag) + except ValidationError as err: dag_description = 'primary' if issubclass( schema_cls, PrimaryDagSchema) else 'sub' raise Exception('Found errors in {} dag: {}'.format( - dag_description, loaded.errors)) - - parsed = loaded.data + dag_description, err.messages)) registries = { 'resources': plugins.manager.resources, diff --git a/boundary_layer_default_plugin/oozie_actions.py b/boundary_layer_default_plugin/oozie_actions.py index 4c5fdec..e9e96c1 100644 --- a/boundary_layer_default_plugin/oozie_actions.py +++ b/boundary_layer_default_plugin/oozie_actions.py @@ -28,7 +28,7 @@ class OozieFileSystemSourceDestSchema(OozieBaseSchema): class OozieFileSystemPathSchema(OozieBaseSchema): - path = ma.fields.String(required=True, load_from='@path') + path = ma.fields.String(required=True, data_key='@path') class OozieFileSystemActionSchema(OozieBaseSchema): @@ -83,7 +83,7 @@ class OozieHadoopConfigurationSchema(OozieBaseSchema): _property = ma.fields.List( ma.fields.Nested(OozieNameValueSchema), required=True, - load_from='property', + data_key='property', dump_to='property', attribute='property') @@ -93,9 +93,9 @@ class OozieHadoopConfigurationSchema(OozieBaseSchema): class OozieMapReduceActionSchema(OozieBaseSchema): arg = ma.fields.List(ma.fields.String(), missing=[]) configuration = ma.fields.Nested(OozieHadoopConfigurationSchema) - job_tracker = ma.fields.String(required=True, load_from='job-tracker') - name_node = ma.fields.String(required=True, load_from='name-node') - main_class = ma.fields.String(load_from='main-class') + job_tracker = ma.fields.String(required=True, data_key='job-tracker') + name_node = ma.fields.String(required=True, data_key='name-node') + main_class = ma.fields.String(data_key='main-class') class OozieMapReduceActionBuilder(OozieActionBuilderWithSchema): diff --git a/boundary_layer_default_plugin/preprocessors.py b/boundary_layer_default_plugin/preprocessors.py index f6dce48..faed01d 100644 --- a/boundary_layer_default_plugin/preprocessors.py +++ b/boundary_layer_default_plugin/preprocessors.py @@ -48,7 +48,7 @@ class BuildKubernetesSchema(StrictSchema): class_name = ma.fields.String(required=True) @ma.validates_schema - def check_valid_class(self, data): + def check_valid_class(self, data, **kwargs): ALLOWED_CLASS = ['airflow.contrib.kubernetes.volume.Volume', 'airflow.contrib.kubernetes.volume_mount.VolumeMount', 'airflow.contrib.kubernetes.secret.Secret', @@ -89,7 +89,7 @@ class BuildTimedeltaSchema(StrictSchema): units = ma.fields.String(required=True) @ma.validates_schema - def check_valid_units(self, data): + def check_valid_units(self, data, **kwargs): ALLOWED_UNITS = ['seconds', 'minutes', 'hours', 'days'] if data.get('units') not in ALLOWED_UNITS: raise ma.ValidationError( diff --git a/setup.py b/setup.py index 361e780..9e108d7 100644 --- a/setup.py +++ b/setup.py @@ -42,7 +42,7 @@ 'jsonschema>=2.6.0,<3.0', 'jinja2>=2.8.1,<3.0', 'pyyaml>=4.2b1', - 'marshmallow>=2.13.6,<3.0', + 'marshmallow>=3.0,<4.0', 'networkx>=2.1,<2.3', 'xmltodict>=0.11.0,<1.0', 'six>=1.11.0,<2.0', @@ -63,5 +63,5 @@ license = 'Apache License 2.0', keywords = 'airflow', zip_safe = False, - python_requires='>=2.7,!=3.0.*,!=3.1.*,!=3.2.*,!=3.3.*', + python_requires='>=3.5', ) diff --git a/tox.ini b/tox.ini index 3fd3999..c1f168a 100644 --- a/tox.ini +++ b/tox.ini @@ -1,7 +1,8 @@ [tox] -envlist = py27, py3, lint +envlist = py3, lint [testenv] +install_command=pip install --trusted-host=pypi.org --trusted-host=files.pythonhosted.org {opts} {packages} deps = pytest==3.8.1 pytest-cov==2.6.0 pytest-mock==1.10.0