diff --git a/irrd/updates/tests/test_validators.py b/irrd/updates/tests/test_validators.py index 00c3d0332..5b9d66293 100644 --- a/irrd/updates/tests/test_validators.py +++ b/irrd/updates/tests/test_validators.py @@ -14,7 +14,7 @@ from irrd.scopefilter.validators import ScopeFilterValidator from irrd.storage.models import JournalEntryOrigin from irrd.utils.rpsl_samples import SAMPLE_INETNUM, SAMPLE_AS_SET, SAMPLE_MNTNER_CRYPT, SAMPLE_PERSON, SAMPLE_MNTNER, \ - SAMPLE_ROUTE, SAMPLE_MNTNER_MD5 + SAMPLE_ROUTE, SAMPLE_MNTNER_MD5, SAMPLE_ROUTE6 from irrd.utils.test_utils import flatten_mock_calls from irrd.utils.text import splitline_unicodesafe, remove_auth_hashes from ..parser import parse_change_requests @@ -155,13 +155,13 @@ def test_existing_person_mntner_change(self, prepare_mocks): assert not result.is_valid() print(result.error_messages) assert result.error_messages == {'Authorisation for person PERSON-TEST failed: ' - 'must by authenticated by one of: TEST-MNT, TEST-NEW-MNT'} + 'must be authenticated by one of: TEST-MNT, TEST-NEW-MNT'} validator.passwords = [SAMPLE_MNTNER_CRYPT] result = validator.process_auth(person_new, person_old) assert not result.is_valid() assert result.error_messages == {'Authorisation for person PERSON-TEST failed: ' - 'must by authenticated by one of: TEST-MNT, TEST-OLD-MNT'} + 'must be authenticated by one of: TEST-MNT, TEST-OLD-MNT'} # TODO: the preapprove function needs a bunch of refactoring for this to work # def test_valid_new_person_preapproved_mntner(self, prepare_mocks): @@ -244,7 +244,7 @@ def test_modify_mntner(self, prepare_mocks, config_override): 'provided for authentication.' } - # # This is a multi password submission which is rejected + # # This is a multi password submission with dummy hashes which is rejected validator.passwords = [SAMPLE_MNTNER_MD5, SAMPLE_MNTNER_CRYPT] new_mntner = rpsl_object_from_text(mntner_no_auth_hashes) result = validator.process_auth(new_mntner, mntner) @@ -254,3 +254,195 @@ def test_modify_mntner(self, prepare_mocks, config_override): 'Object submitted with dummy hash values, but multiple or no passwords ' 'submitted. Either submit only full hashes, or a single password.' } + + +class TestAuthValidatorRelatedObjects: + def test_related_route_exact_inetnum(self, prepare_mocks, config_override): + validator, mock_dq, mock_dh = prepare_mocks + route = rpsl_object_from_text(SAMPLE_ROUTE) + query_results = itertools.cycle([ + [{'object_text': SAMPLE_MNTNER.replace('MD5', '')}], # mntner for object + [{ + # attempt to look for exact inetnum + 'object_class': 'inetnum', + 'rpsl_pk': '192.0.2.0-192.0.2.255', + 'parsed_data': {'mnt-by': ['RELATED-MNT']} + }], + [{'object_text': SAMPLE_MNTNER.replace('CRYPT', '')}], # related mntner retrieval + ]) + mock_dh.execute_query = lambda q: next(query_results) + + validator.passwords = [SAMPLE_MNTNER_MD5, SAMPLE_MNTNER_CRYPT] + result = validator.process_auth(route, None) + assert result.is_valid() + assert flatten_mock_calls(mock_dq, flatten_objects=True) == [ + ['sources', (['TEST'],), {}], + ['object_classes', (['mntner'],), {}], + ['rpsl_pks', ({'TEST-MNT'},), {}], + ['sources', (['TEST'],), {}], + + ['object_classes', (['inetnum'],), {}], + ['first_only', (), {}], + ['ip_exact', ('192.0.2.0/24',), {}], + + ['sources', (['TEST'],), {}], + ['object_classes', (['mntner'],), {}], + ['rpsl_pks', ({'RELATED-MNT'},), {}] + ] + + validator = AuthValidator(mock_dh, None) + validator.passwords = [SAMPLE_MNTNER_CRYPT] # related only has MD5, so this is invalid + result = validator.process_auth(route, None) + assert not result.is_valid() + assert result.error_messages == { + 'Authorisation for route 192.0.2.0/24AS65537 failed: must be authenticated by one of: ' + 'RELATED-MNT - from parent inetnum 192.0.2.0-192.0.2.255' + } + + result = validator.process_auth(route, route) + assert result.is_valid() + + def test_related_route_less_specific_inetnum(self, prepare_mocks, config_override): + validator, mock_dq, mock_dh = prepare_mocks + route = rpsl_object_from_text(SAMPLE_ROUTE) + query_results = itertools.cycle([ + [{'object_text': SAMPLE_MNTNER.replace('MD5', '')}], # mntner for object + [], # attempt to look for exact inetnum + [{ + # attempt to look for one level less specific inetnum + 'object_class': 'inetnum', + 'rpsl_pk': '192.0.2.0-192.0.2.255', + 'parsed_data': {'mnt-by': ['RELATED-MNT']} + }], + [{'object_text': SAMPLE_MNTNER.replace('CRYPT', '')}], # related mntner retrieval + ]) + mock_dh.execute_query = lambda q: next(query_results) + + validator.passwords = [SAMPLE_MNTNER_MD5, SAMPLE_MNTNER_CRYPT] + result = validator.process_auth(route, None) + assert result.is_valid() + assert flatten_mock_calls(mock_dq, flatten_objects=True) == [ + ['sources', (['TEST'],), {}], + ['object_classes', (['mntner'],), {}], + ['rpsl_pks', ({'TEST-MNT'},), {}], + ['sources', (['TEST'],), {}], + + ['object_classes', (['inetnum'],), {}], + ['first_only', (), {}], + ['ip_exact', ('192.0.2.0/24',), {}], + + ['sources', (['TEST'],), {}], + ['object_classes', (['inetnum'],), {}], + ['first_only', (), {}], + ['ip_less_specific_one_level', ('192.0.2.0/24',), {}], + + ['sources', (['TEST'],), {}], + ['object_classes', (['mntner'],), {}], + ['rpsl_pks', ({'RELATED-MNT'},), {}] + ] + + validator = AuthValidator(mock_dh, None) + validator.passwords = [SAMPLE_MNTNER_CRYPT] # related only has MD5, so this is invalid + result = validator.process_auth(route, None) + assert not result.is_valid() + assert result.error_messages == { + 'Authorisation for route 192.0.2.0/24AS65537 failed: must be authenticated by one of: ' + 'RELATED-MNT - from parent inetnum 192.0.2.0-192.0.2.255' + } + + result = validator.process_auth(route, route) + assert result.is_valid() + + def test_related_route_less_specific_route(self, prepare_mocks, config_override): + validator, mock_dq, mock_dh = prepare_mocks + route = rpsl_object_from_text(SAMPLE_ROUTE) + query_results = itertools.cycle([ + [{'object_text': SAMPLE_MNTNER.replace('MD5', '')}], # mntner for object + [], # attempt to look for exact inetnum + [], # attempt to look for one level less specific inetnum + [{ + # attempt to look for less specific route + 'object_class': 'route', + 'rpsl_pk': '192.0.2.0/24AS65537', + 'parsed_data': {'mnt-by': ['RELATED-MNT']} + }], + [{'object_text': SAMPLE_MNTNER.replace('CRYPT', '')}], # related mntner retrieval + ]) + mock_dh.execute_query = lambda q: next(query_results) + + validator.passwords = [SAMPLE_MNTNER_MD5, SAMPLE_MNTNER_CRYPT] + result = validator.process_auth(route, None) + assert result.is_valid() + + assert flatten_mock_calls(mock_dq, flatten_objects=True) == [ + ['sources', (['TEST'],), {}], + ['object_classes', (['mntner'],), {}], + ['rpsl_pks', ({'TEST-MNT'},), {}], + ['sources', (['TEST'],), {}], + + ['object_classes', (['inetnum'],), {}], + ['first_only', (), {}], + ['ip_exact', ('192.0.2.0/24',), {}], + + ['sources', (['TEST'],), {}], + ['object_classes', (['inetnum'],), {}], + ['first_only', (), {}], + ['ip_less_specific_one_level', ('192.0.2.0/24',), {}], + + ['sources', (['TEST'],), {}], + ['object_classes', (['route'],), {}], + ['first_only', (), {}], + ['ip_less_specific_one_level', ('192.0.2.0/24',), {}], + + ['sources', (['TEST'],), {}], + ['object_classes', (['mntner'],), {}], + ['rpsl_pks', ({'RELATED-MNT'},), {}] + ] + + validator = AuthValidator(mock_dh, None) + validator.passwords = [SAMPLE_MNTNER_CRYPT] # related only has MD5, so this is invalid + result = validator.process_auth(route, None) + assert not result.is_valid() + assert result.error_messages == { + 'Authorisation for route 192.0.2.0/24AS65537 failed: must be authenticated by one of: ' + 'RELATED-MNT - from parent route 192.0.2.0/24AS65537' + } + + result = validator.process_auth(route, route) + assert result.is_valid() + + def test_related_route_no_match_v6(self, prepare_mocks, config_override): + validator, mock_dq, mock_dh = prepare_mocks + route = rpsl_object_from_text(SAMPLE_ROUTE6) + query_results = itertools.cycle([ + [{'object_text': SAMPLE_MNTNER}], # mntner for object + [], # attempt to look for exact inetnum + [], # attempt to look for one level less specific inetnum + [], # attempt to look for less specific route + ]) + mock_dh.execute_query = lambda q: next(query_results) + + validator.passwords = [SAMPLE_MNTNER_MD5] + result = validator.process_auth(route, None) + assert result.is_valid() + + assert flatten_mock_calls(mock_dq, flatten_objects=True) == [ + ['sources', (['TEST'],), {}], + ['object_classes', (['mntner'],), {}], + ['rpsl_pks', ({'TEST-MNT'},), {}], + ['sources', (['TEST'],), {}], + + ['object_classes', (['inet6num'],), {}], + ['first_only', (), {}], + ['ip_exact', ('2001:db8::/48',), {}], + + ['sources', (['TEST'],), {}], + ['object_classes', (['inet6num'],), {}], + ['first_only', (), {}], + ['ip_less_specific_one_level', ('2001:db8::/48',), {}], + + ['sources', (['TEST'],), {}], + ['object_classes', (['route6'],), {}], + ['first_only', (), {}], + ['ip_less_specific_one_level', ('2001:db8::/48',), {}], + ] diff --git a/irrd/updates/validators.py b/irrd/updates/validators.py index 101655386..ce230ff37 100644 --- a/irrd/updates/validators.py +++ b/irrd/updates/validators.py @@ -296,7 +296,8 @@ def _generate_failure_message(self, result: ValidatorResult, failed_mntner_list: def _find_related_mntners(self, rpsl_obj_new: RPSLObject) -> Optional[Tuple[str, str, List[str]]]: """ Find the maintainers of the related object to rpsl_obj_new, if any. - This is used to authorise creating objects. + This is used to authorise creating objects - authentication may be + required to pass for the related object as well. Returns a tuple of: - object class of the related object @@ -304,35 +305,47 @@ def _find_related_mntners(self, rpsl_obj_new: RPSLObject) -> Optional[Tuple[str, - List of maintainers for the related object (at least one must pass) Returns None of no related objects were found that should be authenticated. """ - if rpsl_obj_new.rpsl_object_class not in ['route', 'route6']: - return None + related_object = None + if rpsl_obj_new.rpsl_object_class in ['route', 'route6']: + related_object = self._find_related_object_route(rpsl_obj_new) + if related_object: + mntners = related_object.get('parsed_data', {}).get('mnt-by', []) + return related_object['object_class'], related_object['rpsl_pk'], mntners + + return None + + def _find_related_object_route(self, rpsl_obj_new: RPSLObject): + """ + Find the related inetnum/route object to rpsl_obj_new, which must be a route(6). + Returns a dict as returned by the database handler. + """ inetnum_class = { 'route': 'inetnum', 'route6': 'inet6num', } - def init_query(rpsl_object_class: str) -> RPSLDatabaseQuery: - query = RPSLDatabaseQuery().sources([rpsl_obj_new.source()]) - query = query.object_classes([rpsl_object_class]) - return query.first_only() - object_class = inetnum_class[rpsl_obj_new.rpsl_object_class] - query = init_query(object_class).ip_exact(rpsl_obj_new.prefix) + query = _init_related_object_query(object_class, rpsl_obj_new).ip_exact(rpsl_obj_new.prefix) inetnums = list(self.database_handler.execute_query(query)) + if not inetnums: - query = init_query(object_class).ip_less_specific_one_level(rpsl_obj_new.prefix) + query = _init_related_object_query(object_class, rpsl_obj_new).ip_less_specific_one_level(rpsl_obj_new.prefix) inetnums = list(self.database_handler.execute_query(query)) if inetnums: - mntners = inetnums[0].get('parsed_data', {}).get('mnt-by', []) - return inetnums[0]['object_class'], inetnums[0]['rpsl_pk'], mntners + return inetnums[0] object_class = rpsl_obj_new.rpsl_object_class - query = init_query(object_class).ip_less_specific_one_level(rpsl_obj_new.prefix) + query = _init_related_object_query(object_class, rpsl_obj_new).ip_less_specific_one_level(rpsl_obj_new.prefix) routes = list(self.database_handler.execute_query(query)) if routes: - mntners = routes[0].get('parsed_data', {}).get('mnt-by', []) - return routes[0]['object_class'], routes[0]['rpsl_pk'], mntners + return routes[0] return None + + +def _init_related_object_query(rpsl_object_class: str, rpsl_obj_new: RPSLObject) -> RPSLDatabaseQuery: + query = RPSLDatabaseQuery().sources([rpsl_obj_new.source()]) + query = query.object_classes([rpsl_object_class]) + return query.first_only()