Skip to content

Commit

Permalink
[IMP] cetmix_tower_server: Key Access tests
Browse files Browse the repository at this point in the history
  • Loading branch information
GabbasovDinar committed Jan 30, 2025
1 parent 4203086 commit 53aa613
Show file tree
Hide file tree
Showing 4 changed files with 139 additions and 7 deletions.
2 changes: 1 addition & 1 deletion cetmix_tower_server/models/cx_tower_key.py
Original file line number Diff line number Diff line change
Expand Up @@ -476,7 +476,7 @@ def _resolve_key_type_secret(self, reference, **kwargs):
)

# Fetch keys
keys = self.search(key_domain).sudo()
keys = self.sudo().search(key_domain)
if not keys:
return

Expand Down
2 changes: 2 additions & 0 deletions cetmix_tower_server/security/cx_tower_key_security.xml
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,10 @@
[
"|",
"&",
"&",
("server_id", "=", False),
("server_ssh_ids", "=", False),
("partner_id", "=", False),
"|",
"|",
("server_id.message_partner_ids", "in", [user.partner_id.id]),
Expand Down
43 changes: 43 additions & 0 deletions cetmix_tower_server/tests/test_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -645,6 +645,49 @@ def test_user_access_rule(self):
msg="Command name should be same",
)

def test_user_access_rule_with_keys(self):
"""Test user access rule"""

# create server key
server_key = self.Key.create(
{
"name": "server key",
"secret_value": "server key value",
"key_type": "s",
"server_id": self.server_test_1.id,
}
)

# Create the test command with server key
code = f"mkdir {server_key.reference_code}"
command_with_key = self.Command.create(
{"name": "Command with key", "code": code, "access_level": "1"}
)

# Remove bob from all cxtower_server groups
self.remove_from_group(
self.user_bob,
[
"cetmix_tower_server.group_user",
"cetmix_tower_server.group_manager",
"cetmix_tower_server.group_root",
],
)

# Add user to group
self.add_to_group(self.user_bob, "cetmix_tower_server.group_user")
# User can execute command with key
self.server_test_1.with_user(self.user_bob).execute_command(
command_with_key,
)

# Add user to group_manager
self.add_to_group(self.user_bob, "cetmix_tower_server.group_manager")
# Manager can execute command with key
self.server_test_1.with_user(self.user_bob).execute_command(
command_with_key,
)

def test_parse_ssh_command_result(self):
"""Test ssh command result parsing"""

Expand Down
99 changes: 93 additions & 6 deletions cetmix_tower_server/tests/test_key.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,24 +27,24 @@ def test_key_creation(self):
"Trailing and leading whitespaces must be removed from name",
)

def test_key_access_rights(self):
"""Test private key security features"""
def test_global_key_access_rights(self):
"""Test global private key security features"""

# Store key value
# Store global key value
self.write_and_invalidate(self.key_1, **{"secret_value": "pepe"})

# Get key value as Bob
key_bob = self.key_1.with_user(self.user_bob)

with self.assertRaises(AccessError):
key_value = key_bob.secret_value
key_value = key_bob.read(["secret_value"])

# Add user to group
self.add_to_group(self.user_bob, "cetmix_tower_server.group_user")

with self.assertRaises(AccessError):
# Get value
key_value = key_bob.secret_value
key_value = key_bob.read(["secret_value"])

# Test write
with self.assertRaises(AccessError):
Expand All @@ -63,10 +63,97 @@ def test_key_access_rights(self):
with self.assertRaises(AccessError):
key_bob.unlink()

# Add Bob to Root group and test write again
# Add Bob to Root group and test delete again
self.add_to_group(self.user_bob, "cetmix_tower_server.group_root")
key_bob.unlink()

def test_server_key_access_rights(self):
"""Test server private key security features"""

# Store server key value
self.write_and_invalidate(
self.key_1, **{"secret_value": "pepe", "server_id": self.server_test_1.id}
)

# Get key value as Bob
key_bob = self.key_1.with_user(self.user_bob)

# Add user to group
self.add_to_group(self.user_bob, "cetmix_tower_server.group_user")

with self.assertRaises(AccessError):
# Get value
key_bob.read(["secret_value"])

# Test write
with self.assertRaises(AccessError):
self.write_and_invalidate(key_bob, **{"secret_value": "frog"})

# Add Bob as subscriber to server
self.server_test_1.message_subscribe([self.user_bob.partner_id.id])

# User cannot read key value
with self.assertRaises(AccessError):
key_bob.read(["secret_value"])

# Remove Bob from server followers
self.server_test_1.message_unsubscribe([self.user_bob.partner_id.id])

# Add Bob to Manager group
self.add_to_group(self.user_bob, "cetmix_tower_server.group_manager")

# Manager cannot read key value
with self.assertRaises(AccessError):
key_bob.read(["secret_value"])

# Add Bob as subscriber to server
self.server_test_1.message_subscribe([self.user_bob.partner_id.id])

# Now manager server subscriber can read and update this key
key_bob.read(["secret_value"])
key_bob.write({"secret_value": "dog"})

def test_partner_key_access_rights(self):
"""Test partner private key security features"""
# create test partner
partner = self.env["res.partner"].create({"name": "test partner"})

# update server for this partner
self.write_and_invalidate(self.server_test_1, **{"partner_id": partner.id})

# Store key partner
self.write_and_invalidate(self.key_1, **{"partner_id": partner.id})

# Get key value as Bob
key_bob = self.key_1.with_user(self.user_bob)

# Add user to group
self.add_to_group(self.user_bob, "cetmix_tower_server.group_user")

# Ensure that user don't has access to key
with self.assertRaises(AccessError):
# Get value
key_bob.read(["secret_value"])

# Add Bob to Manager group
self.add_to_group(self.user_bob, "cetmix_tower_server.group_manager")

# Ensure that manager don't has access to key
with self.assertRaises(AccessError):
# Get value
key_bob.read(["secret_value"])

# Ensure that key has no servers
self.assertFalse(self.key_1.server_id)
self.assertFalse(self.key_1.server_ssh_ids)

# Add Bob as subscriber to server
partner.server_ids.message_subscribe([self.user_bob.partner_id.id])

# Now manager server subscriber can read and update this key
key_bob.read(["secret_value"])
key_bob.write({"secret_value": "dog"})

def test_extract_key_strings(self):
"""Check if key strings are extracted properly"""
code = (
Expand Down

0 comments on commit 53aa613

Please sign in to comment.