diff --git a/cetmix_tower_server/models/cx_tower_key.py b/cetmix_tower_server/models/cx_tower_key.py index 0435fb44..a9307d93 100644 --- a/cetmix_tower_server/models/cx_tower_key.py +++ b/cetmix_tower_server/models/cx_tower_key.py @@ -476,7 +476,7 @@ def _resolve_key_type_secret(self, reference, **kwargs): ) # Fetch keys - keys = self.search(key_domain).sudo() + keys = self.sudo().search(key_domain) if not keys: return diff --git a/cetmix_tower_server/security/cx_tower_key_security.xml b/cetmix_tower_server/security/cx_tower_key_security.xml index f3134c9b..7836029b 100644 --- a/cetmix_tower_server/security/cx_tower_key_security.xml +++ b/cetmix_tower_server/security/cx_tower_key_security.xml @@ -8,8 +8,10 @@ [ "|", "&", + "&", ("server_id", "=", False), ("server_ssh_ids", "=", False), + ("partner_id", "=", False), "|", "|", ("server_id.message_partner_ids", "in", [user.partner_id.id]), diff --git a/cetmix_tower_server/tests/test_command.py b/cetmix_tower_server/tests/test_command.py index 7438e5c8..ddd6d198 100644 --- a/cetmix_tower_server/tests/test_command.py +++ b/cetmix_tower_server/tests/test_command.py @@ -645,6 +645,49 @@ def test_user_access_rule(self): msg="Command name should be same", ) + def test_user_access_rule_with_keys(self): + """Test user access rule""" + + # create server key + server_key = self.Key.create( + { + "name": "server key", + "secret_value": "server key value", + "key_type": "s", + "server_id": self.server_test_1.id, + } + ) + + # Create the test command with server key + code = f"mkdir {server_key.reference_code}" + command_with_key = self.Command.create( + {"name": "Command with key", "code": code, "access_level": "1"} + ) + + # Remove bob from all cxtower_server groups + self.remove_from_group( + self.user_bob, + [ + "cetmix_tower_server.group_user", + "cetmix_tower_server.group_manager", + "cetmix_tower_server.group_root", + ], + ) + + # Add user to group + self.add_to_group(self.user_bob, "cetmix_tower_server.group_user") + # User can execute command with key + self.server_test_1.with_user(self.user_bob).execute_command( + command_with_key, + ) + + # Add user to group_manager + self.add_to_group(self.user_bob, "cetmix_tower_server.group_manager") + # Manager can execute command with key + self.server_test_1.with_user(self.user_bob).execute_command( + command_with_key, + ) + def test_parse_ssh_command_result(self): """Test ssh command result parsing""" diff --git a/cetmix_tower_server/tests/test_key.py b/cetmix_tower_server/tests/test_key.py index 5cb73760..f856253a 100644 --- a/cetmix_tower_server/tests/test_key.py +++ b/cetmix_tower_server/tests/test_key.py @@ -27,24 +27,24 @@ def test_key_creation(self): "Trailing and leading whitespaces must be removed from name", ) - def test_key_access_rights(self): - """Test private key security features""" + def test_global_key_access_rights(self): + """Test global private key security features""" - # Store key value + # Store global key value self.write_and_invalidate(self.key_1, **{"secret_value": "pepe"}) # Get key value as Bob key_bob = self.key_1.with_user(self.user_bob) with self.assertRaises(AccessError): - key_value = key_bob.secret_value + key_value = key_bob.read(["secret_value"]) # Add user to group self.add_to_group(self.user_bob, "cetmix_tower_server.group_user") with self.assertRaises(AccessError): # Get value - key_value = key_bob.secret_value + key_value = key_bob.read(["secret_value"]) # Test write with self.assertRaises(AccessError): @@ -63,10 +63,97 @@ def test_key_access_rights(self): with self.assertRaises(AccessError): key_bob.unlink() - # Add Bob to Root group and test write again + # Add Bob to Root group and test delete again self.add_to_group(self.user_bob, "cetmix_tower_server.group_root") key_bob.unlink() + def test_server_key_access_rights(self): + """Test server private key security features""" + + # Store server key value + self.write_and_invalidate( + self.key_1, **{"secret_value": "pepe", "server_id": self.server_test_1.id} + ) + + # Get key value as Bob + key_bob = self.key_1.with_user(self.user_bob) + + # Add user to group + self.add_to_group(self.user_bob, "cetmix_tower_server.group_user") + + with self.assertRaises(AccessError): + # Get value + key_bob.read(["secret_value"]) + + # Test write + with self.assertRaises(AccessError): + self.write_and_invalidate(key_bob, **{"secret_value": "frog"}) + + # Add Bob as subscriber to server + self.server_test_1.message_subscribe([self.user_bob.partner_id.id]) + + # User cannot read key value + with self.assertRaises(AccessError): + key_bob.read(["secret_value"]) + + # Remove Bob from server followers + self.server_test_1.message_unsubscribe([self.user_bob.partner_id.id]) + + # Add Bob to Manager group + self.add_to_group(self.user_bob, "cetmix_tower_server.group_manager") + + # Manager cannot read key value + with self.assertRaises(AccessError): + key_bob.read(["secret_value"]) + + # Add Bob as subscriber to server + self.server_test_1.message_subscribe([self.user_bob.partner_id.id]) + + # Now manager server subscriber can read and update this key + key_bob.read(["secret_value"]) + key_bob.write({"secret_value": "dog"}) + + def test_partner_key_access_rights(self): + """Test partner private key security features""" + # create test partner + partner = self.env["res.partner"].create({"name": "test partner"}) + + # update server for this partner + self.write_and_invalidate(self.server_test_1, **{"partner_id": partner.id}) + + # Store key partner + self.write_and_invalidate(self.key_1, **{"partner_id": partner.id}) + + # Get key value as Bob + key_bob = self.key_1.with_user(self.user_bob) + + # Add user to group + self.add_to_group(self.user_bob, "cetmix_tower_server.group_user") + + # Ensure that user don't has access to key + with self.assertRaises(AccessError): + # Get value + key_bob.read(["secret_value"]) + + # Add Bob to Manager group + self.add_to_group(self.user_bob, "cetmix_tower_server.group_manager") + + # Ensure that manager don't has access to key + with self.assertRaises(AccessError): + # Get value + key_bob.read(["secret_value"]) + + # Ensure that key has no servers + self.assertFalse(self.key_1.server_id) + self.assertFalse(self.key_1.server_ssh_ids) + + # Add Bob as subscriber to server + partner.server_ids.message_subscribe([self.user_bob.partner_id.id]) + + # Now manager server subscriber can read and update this key + key_bob.read(["secret_value"]) + key_bob.write({"secret_value": "dog"}) + def test_extract_key_strings(self): """Check if key strings are extracted properly""" code = (