Skip to content

Commit

Permalink
permission api: use _keystore.get_* functions (#194)
Browse files Browse the repository at this point in the history
Proof that we're still completely missing test coverage of some verbs.
Add a basic `create_permissions` test as well.

Signed-off-by: Kyle Fazzari <[email protected]>
  • Loading branch information
kyrofa authored Apr 9, 2020
1 parent 1984821 commit 84dc386
Show file tree
Hide file tree
Showing 3 changed files with 109 additions and 3 deletions.
6 changes: 3 additions & 3 deletions sros2/sros2/api/_permission.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,16 +34,16 @@ def create_permission(keystore_path, identity, policy_file_path):

def create_permissions_from_policy_element(keystore_path, identity, policy_element):
relative_path = os.path.normpath(identity.lstrip('/'))
key_dir = os.path.join(_keystore.keystore_context_dir(keystore_path), relative_path)
key_dir = os.path.join(_keystore.get_keystore_context_dir(keystore_path), relative_path)
print("creating permission file for identity: '%s'" % identity)
permissions_path = os.path.join(key_dir, 'permissions.xml')
create_permission_file(permissions_path, _utilities.domain_id(), policy_element)

signed_permissions_path = os.path.join(key_dir, 'permissions.p7s')
keystore_ca_cert_path = os.path.join(
_keystore.keystore_public_dir(keystore_path), 'ca.cert.pem')
_keystore.get_keystore_public_dir(keystore_path), 'ca.cert.pem')
keystore_ca_key_path = os.path.join(
_keystore.keystore_private_dir(keystore_path), 'ca.key.pem')
_keystore.get_keystore_private_dir(keystore_path), 'ca.key.pem')
_utilities.create_smime_signed_file(
keystore_ca_cert_path, keystore_ca_key_path, permissions_path, signed_permissions_path)

Expand Down
8 changes: 8 additions & 0 deletions sros2/test/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,15 @@
# limitations under the License.

import os
import pathlib

import pytest

# Disable lxml2 lookup of resources on the internet by configuring it to use a proxy
# that does not exist
os.environ['HTTP_PROXY'] = 'http://example.com'


@pytest.fixture('session')
def test_policy_dir():
return pathlib.Path(__file__).parent / 'policies'
98 changes: 98 additions & 0 deletions sros2/test/sros2/commands/security/verbs/test_create_permission.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
# Copyright 2020 Canonical Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import pathlib

import lxml

import pytest

import rclpy
from ros2cli import cli
from sros2.api import _key, _keystore, _permission
from sros2.policy import get_transport_schema


_test_identity = '/talker_listener/talker'


# This fixture will run once for the entire module (as opposed to once per test)
@pytest.fixture(scope='module')
def security_context_dir(tmpdir_factory, test_policy_dir) -> pathlib.Path:
keystore_dir = pathlib.Path(str(tmpdir_factory.mktemp('keystore')))

# First, create the keystore as well as a keypair for the talker
assert _keystore.create_keystore(keystore_dir)
assert _key.create_key(keystore_dir, _test_identity)

security_files_dir = keystore_dir.joinpath(f'contexts{_test_identity}')
assert security_files_dir.is_dir()

# Now using that keystore, create a permissions file using the sample policy
policy_file_path = test_policy_dir / 'sample.policy.xml'
assert cli.main(
argv=[
'security', 'create_permission', str(keystore_dir), _test_identity,
str(policy_file_path)]) == 0

# Return path to directory containing the identity's files
return security_files_dir


def test_create_permission(security_context_dir):
assert security_context_dir.joinpath('permissions.xml').is_file()
assert security_context_dir.joinpath('permissions.p7s').is_file()

tree = lxml.etree.parse(str(security_context_dir.joinpath('permissions.xml')))

# Validate the schema
permissions_xsd_path = get_transport_schema('dds', 'permissions.xsd')
permissions_xsd = lxml.etree.XMLSchema(lxml.etree.parse(permissions_xsd_path))
permissions_xsd.assertValid(tree)

dds = tree.getroot()
assert dds.tag == 'dds'

permissions = list(dds.iterchildren(tag='permissions'))
assert len(permissions) == 1

grants = list(permissions[0].iterchildren(tag='grant'))
assert len(grants) == 1
assert grants[0].get('name') == _test_identity

allow_rules = list(grants[0].iterchildren(tag='allow_rule'))
if rclpy.get_rmw_implementation_identifier() in _permission._RMW_WITH_ROS_GRAPH_INFO_TOPIC:
assert len(allow_rules) == 2
else:
assert len(allow_rules) == 1

publish_rules = list(allow_rules[0].iterchildren(tag='publish'))
assert len(publish_rules) == 1

subscribe_rules = list(allow_rules[0].iterchildren(tag='subscribe'))
assert len(subscribe_rules) == 1

published_topics_set = list(publish_rules[0].iterchildren(tag='topics'))
assert len(published_topics_set) == 1
published_topics = [c.text for c in published_topics_set[0].iterchildren(tag='topic')]
assert len(published_topics) > 0

subscribed_topics_set = list(subscribe_rules[0].iterchildren(tag='topics'))
assert len(subscribed_topics_set) == 1
subscribed_topics = [c.text for c in subscribed_topics_set[0].iterchildren(tag='topic')]
assert len(subscribed_topics) > 0

# Verify that publication is allowed on chatter, but not subscription
assert 'rt/chatter' in published_topics
assert 'rt/chatter' not in subscribed_topics

0 comments on commit 84dc386

Please sign in to comment.