Skip to content

Commit

Permalink
Add tests for related object validation
Browse files Browse the repository at this point in the history
  • Loading branch information
mxsasha committed Nov 4, 2021
1 parent 5e1ea36 commit 983590b
Show file tree
Hide file tree
Showing 2 changed files with 224 additions and 19 deletions.
200 changes: 196 additions & 4 deletions irrd/updates/tests/test_validators.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from irrd.scopefilter.validators import ScopeFilterValidator
from irrd.storage.models import JournalEntryOrigin
from irrd.utils.rpsl_samples import SAMPLE_INETNUM, SAMPLE_AS_SET, SAMPLE_MNTNER_CRYPT, SAMPLE_PERSON, SAMPLE_MNTNER, \
SAMPLE_ROUTE, SAMPLE_MNTNER_MD5
SAMPLE_ROUTE, SAMPLE_MNTNER_MD5, SAMPLE_ROUTE6
from irrd.utils.test_utils import flatten_mock_calls
from irrd.utils.text import splitline_unicodesafe, remove_auth_hashes
from ..parser import parse_change_requests
Expand Down Expand Up @@ -155,13 +155,13 @@ def test_existing_person_mntner_change(self, prepare_mocks):
assert not result.is_valid()
print(result.error_messages)
assert result.error_messages == {'Authorisation for person PERSON-TEST failed: '
'must by authenticated by one of: TEST-MNT, TEST-NEW-MNT'}
'must be authenticated by one of: TEST-MNT, TEST-NEW-MNT'}

validator.passwords = [SAMPLE_MNTNER_CRYPT]
result = validator.process_auth(person_new, person_old)
assert not result.is_valid()
assert result.error_messages == {'Authorisation for person PERSON-TEST failed: '
'must by authenticated by one of: TEST-MNT, TEST-OLD-MNT'}
'must be authenticated by one of: TEST-MNT, TEST-OLD-MNT'}

# TODO: the preapprove function needs a bunch of refactoring for this to work
# def test_valid_new_person_preapproved_mntner(self, prepare_mocks):
Expand Down Expand Up @@ -244,7 +244,7 @@ def test_modify_mntner(self, prepare_mocks, config_override):
'provided for authentication.'
}

# # This is a multi password submission which is rejected
# # This is a multi password submission with dummy hashes which is rejected
validator.passwords = [SAMPLE_MNTNER_MD5, SAMPLE_MNTNER_CRYPT]
new_mntner = rpsl_object_from_text(mntner_no_auth_hashes)
result = validator.process_auth(new_mntner, mntner)
Expand All @@ -254,3 +254,195 @@ def test_modify_mntner(self, prepare_mocks, config_override):
'Object submitted with dummy hash values, but multiple or no passwords '
'submitted. Either submit only full hashes, or a single password.'
}


class TestAuthValidatorRelatedObjects:
def test_related_route_exact_inetnum(self, prepare_mocks, config_override):
validator, mock_dq, mock_dh = prepare_mocks
route = rpsl_object_from_text(SAMPLE_ROUTE)
query_results = itertools.cycle([
[{'object_text': SAMPLE_MNTNER.replace('MD5', '')}], # mntner for object
[{
# attempt to look for exact inetnum
'object_class': 'inetnum',
'rpsl_pk': '192.0.2.0-192.0.2.255',
'parsed_data': {'mnt-by': ['RELATED-MNT']}
}],
[{'object_text': SAMPLE_MNTNER.replace('CRYPT', '')}], # related mntner retrieval
])
mock_dh.execute_query = lambda q: next(query_results)

validator.passwords = [SAMPLE_MNTNER_MD5, SAMPLE_MNTNER_CRYPT]
result = validator.process_auth(route, None)
assert result.is_valid()
assert flatten_mock_calls(mock_dq, flatten_objects=True) == [
['sources', (['TEST'],), {}],
['object_classes', (['mntner'],), {}],
['rpsl_pks', ({'TEST-MNT'},), {}],
['sources', (['TEST'],), {}],

['object_classes', (['inetnum'],), {}],
['first_only', (), {}],
['ip_exact', ('192.0.2.0/24',), {}],

['sources', (['TEST'],), {}],
['object_classes', (['mntner'],), {}],
['rpsl_pks', ({'RELATED-MNT'},), {}]
]

validator = AuthValidator(mock_dh, None)
validator.passwords = [SAMPLE_MNTNER_CRYPT] # related only has MD5, so this is invalid
result = validator.process_auth(route, None)
assert not result.is_valid()
assert result.error_messages == {
'Authorisation for route 192.0.2.0/24AS65537 failed: must be authenticated by one of: '
'RELATED-MNT - from parent inetnum 192.0.2.0-192.0.2.255'
}

result = validator.process_auth(route, route)
assert result.is_valid()

def test_related_route_less_specific_inetnum(self, prepare_mocks, config_override):
validator, mock_dq, mock_dh = prepare_mocks
route = rpsl_object_from_text(SAMPLE_ROUTE)
query_results = itertools.cycle([
[{'object_text': SAMPLE_MNTNER.replace('MD5', '')}], # mntner for object
[], # attempt to look for exact inetnum
[{
# attempt to look for one level less specific inetnum
'object_class': 'inetnum',
'rpsl_pk': '192.0.2.0-192.0.2.255',
'parsed_data': {'mnt-by': ['RELATED-MNT']}
}],
[{'object_text': SAMPLE_MNTNER.replace('CRYPT', '')}], # related mntner retrieval
])
mock_dh.execute_query = lambda q: next(query_results)

validator.passwords = [SAMPLE_MNTNER_MD5, SAMPLE_MNTNER_CRYPT]
result = validator.process_auth(route, None)
assert result.is_valid()
assert flatten_mock_calls(mock_dq, flatten_objects=True) == [
['sources', (['TEST'],), {}],
['object_classes', (['mntner'],), {}],
['rpsl_pks', ({'TEST-MNT'},), {}],
['sources', (['TEST'],), {}],

['object_classes', (['inetnum'],), {}],
['first_only', (), {}],
['ip_exact', ('192.0.2.0/24',), {}],

['sources', (['TEST'],), {}],
['object_classes', (['inetnum'],), {}],
['first_only', (), {}],
['ip_less_specific_one_level', ('192.0.2.0/24',), {}],

['sources', (['TEST'],), {}],
['object_classes', (['mntner'],), {}],
['rpsl_pks', ({'RELATED-MNT'},), {}]
]

validator = AuthValidator(mock_dh, None)
validator.passwords = [SAMPLE_MNTNER_CRYPT] # related only has MD5, so this is invalid
result = validator.process_auth(route, None)
assert not result.is_valid()
assert result.error_messages == {
'Authorisation for route 192.0.2.0/24AS65537 failed: must be authenticated by one of: '
'RELATED-MNT - from parent inetnum 192.0.2.0-192.0.2.255'
}

result = validator.process_auth(route, route)
assert result.is_valid()

def test_related_route_less_specific_route(self, prepare_mocks, config_override):
validator, mock_dq, mock_dh = prepare_mocks
route = rpsl_object_from_text(SAMPLE_ROUTE)
query_results = itertools.cycle([
[{'object_text': SAMPLE_MNTNER.replace('MD5', '')}], # mntner for object
[], # attempt to look for exact inetnum
[], # attempt to look for one level less specific inetnum
[{
# attempt to look for less specific route
'object_class': 'route',
'rpsl_pk': '192.0.2.0/24AS65537',
'parsed_data': {'mnt-by': ['RELATED-MNT']}
}],
[{'object_text': SAMPLE_MNTNER.replace('CRYPT', '')}], # related mntner retrieval
])
mock_dh.execute_query = lambda q: next(query_results)

validator.passwords = [SAMPLE_MNTNER_MD5, SAMPLE_MNTNER_CRYPT]
result = validator.process_auth(route, None)
assert result.is_valid()

assert flatten_mock_calls(mock_dq, flatten_objects=True) == [
['sources', (['TEST'],), {}],
['object_classes', (['mntner'],), {}],
['rpsl_pks', ({'TEST-MNT'},), {}],
['sources', (['TEST'],), {}],

['object_classes', (['inetnum'],), {}],
['first_only', (), {}],
['ip_exact', ('192.0.2.0/24',), {}],

['sources', (['TEST'],), {}],
['object_classes', (['inetnum'],), {}],
['first_only', (), {}],
['ip_less_specific_one_level', ('192.0.2.0/24',), {}],

['sources', (['TEST'],), {}],
['object_classes', (['route'],), {}],
['first_only', (), {}],
['ip_less_specific_one_level', ('192.0.2.0/24',), {}],

['sources', (['TEST'],), {}],
['object_classes', (['mntner'],), {}],
['rpsl_pks', ({'RELATED-MNT'},), {}]
]

validator = AuthValidator(mock_dh, None)
validator.passwords = [SAMPLE_MNTNER_CRYPT] # related only has MD5, so this is invalid
result = validator.process_auth(route, None)
assert not result.is_valid()
assert result.error_messages == {
'Authorisation for route 192.0.2.0/24AS65537 failed: must be authenticated by one of: '
'RELATED-MNT - from parent route 192.0.2.0/24AS65537'
}

result = validator.process_auth(route, route)
assert result.is_valid()

def test_related_route_no_match_v6(self, prepare_mocks, config_override):
validator, mock_dq, mock_dh = prepare_mocks
route = rpsl_object_from_text(SAMPLE_ROUTE6)
query_results = itertools.cycle([
[{'object_text': SAMPLE_MNTNER}], # mntner for object
[], # attempt to look for exact inetnum
[], # attempt to look for one level less specific inetnum
[], # attempt to look for less specific route
])
mock_dh.execute_query = lambda q: next(query_results)

validator.passwords = [SAMPLE_MNTNER_MD5]
result = validator.process_auth(route, None)
assert result.is_valid()

assert flatten_mock_calls(mock_dq, flatten_objects=True) == [
['sources', (['TEST'],), {}],
['object_classes', (['mntner'],), {}],
['rpsl_pks', ({'TEST-MNT'},), {}],
['sources', (['TEST'],), {}],

['object_classes', (['inet6num'],), {}],
['first_only', (), {}],
['ip_exact', ('2001:db8::/48',), {}],

['sources', (['TEST'],), {}],
['object_classes', (['inet6num'],), {}],
['first_only', (), {}],
['ip_less_specific_one_level', ('2001:db8::/48',), {}],

['sources', (['TEST'],), {}],
['object_classes', (['route6'],), {}],
['first_only', (), {}],
['ip_less_specific_one_level', ('2001:db8::/48',), {}],
]
43 changes: 28 additions & 15 deletions irrd/updates/validators.py
Original file line number Diff line number Diff line change
Expand Up @@ -296,43 +296,56 @@ def _generate_failure_message(self, result: ValidatorResult, failed_mntner_list:
def _find_related_mntners(self, rpsl_obj_new: RPSLObject) -> Optional[Tuple[str, str, List[str]]]:
"""
Find the maintainers of the related object to rpsl_obj_new, if any.
This is used to authorise creating objects.
This is used to authorise creating objects - authentication may be
required to pass for the related object as well.
Returns a tuple of:
- object class of the related object
- PK of the related object
- List of maintainers for the related object (at least one must pass)
Returns None of no related objects were found that should be authenticated.
"""
if rpsl_obj_new.rpsl_object_class not in ['route', 'route6']:
return None
related_object = None
if rpsl_obj_new.rpsl_object_class in ['route', 'route6']:
related_object = self._find_related_object_route(rpsl_obj_new)

if related_object:
mntners = related_object.get('parsed_data', {}).get('mnt-by', [])
return related_object['object_class'], related_object['rpsl_pk'], mntners

return None

def _find_related_object_route(self, rpsl_obj_new: RPSLObject):
"""
Find the related inetnum/route object to rpsl_obj_new, which must be a route(6).
Returns a dict as returned by the database handler.
"""
inetnum_class = {
'route': 'inetnum',
'route6': 'inet6num',
}

def init_query(rpsl_object_class: str) -> RPSLDatabaseQuery:
query = RPSLDatabaseQuery().sources([rpsl_obj_new.source()])
query = query.object_classes([rpsl_object_class])
return query.first_only()

object_class = inetnum_class[rpsl_obj_new.rpsl_object_class]
query = init_query(object_class).ip_exact(rpsl_obj_new.prefix)
query = _init_related_object_query(object_class, rpsl_obj_new).ip_exact(rpsl_obj_new.prefix)
inetnums = list(self.database_handler.execute_query(query))

if not inetnums:
query = init_query(object_class).ip_less_specific_one_level(rpsl_obj_new.prefix)
query = _init_related_object_query(object_class, rpsl_obj_new).ip_less_specific_one_level(rpsl_obj_new.prefix)
inetnums = list(self.database_handler.execute_query(query))

if inetnums:
mntners = inetnums[0].get('parsed_data', {}).get('mnt-by', [])
return inetnums[0]['object_class'], inetnums[0]['rpsl_pk'], mntners
return inetnums[0]

object_class = rpsl_obj_new.rpsl_object_class
query = init_query(object_class).ip_less_specific_one_level(rpsl_obj_new.prefix)
query = _init_related_object_query(object_class, rpsl_obj_new).ip_less_specific_one_level(rpsl_obj_new.prefix)
routes = list(self.database_handler.execute_query(query))
if routes:
mntners = routes[0].get('parsed_data', {}).get('mnt-by', [])
return routes[0]['object_class'], routes[0]['rpsl_pk'], mntners
return routes[0]

return None


def _init_related_object_query(rpsl_object_class: str, rpsl_obj_new: RPSLObject) -> RPSLDatabaseQuery:
query = RPSLDatabaseQuery().sources([rpsl_obj_new.source()])
query = query.object_classes([rpsl_object_class])
return query.first_only()

0 comments on commit 983590b

Please sign in to comment.