Skip to content

Commit

Permalink
Merge PR #182 into 14.0-dev
Browse files Browse the repository at this point in the history
Signed-off-by ivs-cetmix
  • Loading branch information
CetmixGitBot committed Jan 23, 2025
2 parents d8ca2be + 67e1ca2 commit 162d63f
Show file tree
Hide file tree
Showing 11 changed files with 333 additions and 246 deletions.
7 changes: 5 additions & 2 deletions cetmix_tower_server/models/cx_tower_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ class CxTowerFile(models.Model):
name = fields.Char(help="File name WITHOUT path. Eg 'test.txt'")
rendered_name = fields.Char(
compute="_compute_render",
compute_sudo=True,
)
template_id = fields.Many2one(
"cx.tower.file.template",
Expand All @@ -57,6 +58,7 @@ class CxTowerFile(models.Model):
)
rendered_server_dir = fields.Char(
compute="_compute_render",
compute_sudo=True,
)
full_server_path = fields.Char(
compute="_compute_full_server_path",
Expand Down Expand Up @@ -117,6 +119,7 @@ class CxTowerFile(models.Model):
)
rendered_code = fields.Char(
compute="_compute_render",
compute_sudo=True,
help="File content with variables rendered",
)
keep_when_deleted = fields.Boolean(
Expand Down Expand Up @@ -644,7 +647,7 @@ def _process(self, action, raise_error=False):
)
else:
return False
file.server_response = "ok"
file.sudo().server_response = "ok"
except Exception as error:
if raise_error:
raise ValidationError(
Expand Down Expand Up @@ -698,4 +701,4 @@ def _update_file_sync_date(self, last_sync_date):
)
if file.server_response == "ok":
vals.update({"sync_date_last": last_sync_date})
file.write(vals)
file.sudo().write(vals)
42 changes: 31 additions & 11 deletions cetmix_tower_server/models/cx_tower_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -280,15 +280,29 @@ class CxTowerServer(models.Model):
)

# ---- Connection
ip_v4_address = fields.Char(string="IPv4 Address")
ip_v6_address = fields.Char(string="IPv6 Address")
ssh_port = fields.Char(string="SSH port", required=True, default="22")
ssh_username = fields.Char(string="SSH Username", required=True)
ssh_password = fields.Char(string="SSH Password")
ip_v4_address = fields.Char(
string="IPv4 Address", groups="cetmix_tower_server.group_manager"
)
ip_v6_address = fields.Char(
string="IPv6 Address", groups="cetmix_tower_server.group_manager"
)
ssh_port = fields.Char(
string="SSH port",
required=True,
default="22",
groups="cetmix_tower_server.group_manager",
)
ssh_username = fields.Char(
string="SSH Username", required=True, groups="cetmix_tower_server.group_manager"
)
ssh_password = fields.Char(
string="SSH Password", groups="cetmix_tower_server.group_manager"
)
ssh_key_id = fields.Many2one(
comodel_name="cx.tower.key",
string="SSH Private Key",
domain=[("key_type", "=", "k")],
groups="cetmix_tower_server.group_manager",
)
ssh_auth_mode = fields.Selection(
string="SSH Auth Mode",
Expand All @@ -298,11 +312,13 @@ class CxTowerServer(models.Model):
],
default="p",
required=True,
groups="cetmix_tower_server.group_manager",
)
use_sudo = fields.Selection(
string="Use sudo",
selection=[("n", "Without password"), ("p", "With password")],
help="Run commands using 'sudo'",
groups="cetmix_tower_server.group_manager",
)
# ---- Variables
variable_value_ids = fields.One2many(
Expand All @@ -318,7 +334,11 @@ class CxTowerServer(models.Model):
)

# ---- Attributes
os_id = fields.Many2one(string="Operating System", comodel_name="cx.tower.os")
os_id = fields.Many2one(
string="Operating System",
comodel_name="cx.tower.os",
groups="cetmix_tower_server.group_manager",
)
tag_ids = fields.Many2many(
comodel_name="cx.tower.tag",
relation="cx_tower_server_tag_rel",
Expand Down Expand Up @@ -606,6 +626,7 @@ def _get_ssh_client(self, raise_on_error=True, timeout=5000):
SSH: SSH client instance or False and exception content
"""
self.ensure_one()
self = self.sudo()
try:
client = SSH(
host=self.ip_v4_address or self.ip_v6_address,
Expand Down Expand Up @@ -795,7 +816,7 @@ def _render_command(self, command, path=None):

# Get variable values for current server
variable_values_dict = (
self.get_variable_values(variables) # pylint: disable=no-member
self.sudo().get_variable_values(variables) # pylint: disable=no-member
if variables
else False
)
Expand Down Expand Up @@ -859,14 +880,13 @@ def execute_command(
dict(): command execution result if `no_log` context value == True else None
"""
self.ensure_one()

# Populate `sudo` value from the server settings if not provided explicitly
if sudo is None:
if self.ssh_username != "root" and self.use_sudo:
sudo = self.use_sudo
if self.sudo().ssh_username != "root" and self.sudo().use_sudo:
sudo = self.sudo().use_sudo

# Disable `sudo` if user is root
elif sudo and self.ssh_username == "root":
elif sudo and self.sudo().ssh_username == "root":
sudo = None

# Check if no log record should be created
Expand Down
2 changes: 2 additions & 0 deletions cetmix_tower_server/readme/CONFIGURE.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ Fill the values it the tabs below:
- **SSH Private Key**: Used for authentication is SSH Auth Mode is set to "Key"
- **Note**: Comments or user notes

Note: Some fields are visible based on the current user access level.

There is a special **Status** field which indicates current Server status. It is meant to be updated automatically using external API with further customizations.
Following pre-defined statuses are available:

Expand Down
188 changes: 107 additions & 81 deletions cetmix_tower_server/tests/common.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
# Copyright (C) 2022 Cetmix OÜ
# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl).
from unittest.mock import MagicMock
import os
from unittest.mock import MagicMock, patch

from odoo import _
from odoo.exceptions import ValidationError
from odoo.tests import TransactionCase

from odoo.addons.cetmix_tower_server.models.cx_tower_server import SSH


class TestTowerCommon(TransactionCase):
def setUp(self, *args, **kwargs):
Expand Down Expand Up @@ -217,89 +220,112 @@ def setUp(self, *args, **kwargs):
# Flight plan log
self.PlanLog = self.env["cx.tower.plan.log"]

# Patch methods for testing
def _get_ssh_client_patch(self, raise_on_error=True, timeout=5000):
"""Mock method for connection"""
# apply ssh connection patches
self.apply_patches()

def apply_patches(self):
"""
Apply mock patches for SSH-related methods to simulate various scenarios
during testing.
This method sets up and applies the following mock patches:
1. `_connect` method:
- Returns a mocked SSH connection object that simulates the behavior of an
SSH connection.
- The `exec_command` method is patched to return:
- `stdin`: A `MagicMock` instance.
- `stdout`: A mocked object where:
- `channel.recv_exit_status` returns:
- `0` for successful commands.
- `-1` for commands that simulate a failure
(e.g., commands containing the string `"fail"`).
- `readlines` returns:
- `["ok"]` for successful commands.
- An empty list for failed commands.
- `stderr`: A mocked object where:
- `readlines` returns:
- `["error"]` for failed commands.
- An empty list for successful commands.
2. `download_file` method:
- Simulates the behavior of downloading a file and returns:
- A binary string `b"ok\x00"` for files with the `.zip` extension.
- A binary string `b"ok"` for files with all other extensions.
3. `upload_file` method:
- Returns a `MagicMock` object to simulate file upload behavior without
actual execution.
4. `delete_file` method:
- Returns a `MagicMock` object to simulate file deletion behavior without
actual execution.
The patches are applied immediately using `patch.object` and are added to
`addCleanup` to ensure they are automatically stopped after the tests are
executed.
This method should be called during the test setup phase to mock the required
behaviors.
"""

# Patch connection SSH method
def ssh_connect(self):
connection_mock = MagicMock()

# set up stdin with a condition for error simulation
def exec_command_side_effect(command, *args, **kwargs):
# Create mocks for stdin, stdout, and stderr
stdin_mock = MagicMock()
stdout_mock = MagicMock()
stderr_mock = MagicMock()

if "fail" in command:
# Simulate failure
stdout_mock.channel.recv_exit_status.return_value = -1
stdout_mock.readlines.return_value = []
stderr_mock.readlines.return_value = ["error"]
return stdin_mock, stdout_mock, stderr_mock
else:
# Simulate success
stdout_mock.channel.recv_exit_status.return_value = 0
stdout_mock.readlines.return_value = ["ok"]
stderr_mock.readlines.return_value = []
return stdin_mock, stdout_mock, stderr_mock

# Apply side effect to exec_command
connection_mock.exec_command.side_effect = exec_command_side_effect

return connection_mock

connect_patch = patch.object(SSH, "_connect", ssh_connect)
connect_patch.start()
self.addCleanup(connect_patch.stop)

# Patch file manipulation methods for testing
def ssh_download_file(self, remote_path):
_, extension = os.path.splitext(remote_path)
if extension == ".zip":
return b"ok\x00"
return b"ok"

download_file_patch = patch.object(SSH, "download_file", ssh_download_file)
download_file_patch.start()
self.addCleanup(download_file_patch.stop)

def ssh_upload_file(self, file, remote_path):
return MagicMock()

def _execute_command_using_ssh_patch(
self,
client,
command_code,
command_path=None,
raise_on_error=True,
sudo=None,
**kwargs,
):
"""Mock function to test server command execution.
It will not execute any command but just return a pre-defined result
Pass "simulated_result" to kwargs for mocked response. Eg:
"simulated_result": {"status": [0], "response": ["ok"], "error": []}
Args:
client (Bool): Anything
command_code (Text): Command text
command_path (Char, optional): Directory where command is executed
raise_on_error (bool, optional): raise if error Defaults to True.
sudo (selection, optional): Use sudo for commands. Defaults to None.
Returns:
status, [response], [error]
"""

simulated_result = kwargs.get("simulated_result")
if simulated_result:
status = simulated_result["status"]
response = simulated_result["response"]
error = simulated_result["error"]
else:
status = 0
response = ["ok"]
error = []

# Parse inline secrets
code_and_secrets = self.env[
"cx.tower.key"
]._parse_code_and_return_key_values(command_code, **kwargs.get("key", {}))
command_code = code_and_secrets["code"]
secrets = code_and_secrets["key_values"]

command = self.env["cx.tower.key"]._parse_code(
command_code, **kwargs.get("key", {})
)

if status != 0:
if raise_on_error:
raise ValidationError(_("SSH execute command error"))
return self._parse_command_results(-1, [], error, secrets, **kwargs)

command = self._prepare_ssh_command(command, command_path, sudo)

# Compose response multiple commands: sudo with password
if isinstance(command, list):
status_list = []
response_list = []
error_list = []
for cmd in command: # pylint: disable=unused-variable # noqa
status_list.append(status)
response_list += response
error_list += error

return self._parse_command_results(
status, response, error, secrets, **kwargs
)

self.Server._patch_method("_get_ssh_client", _get_ssh_client_patch)
self.Server._patch_method(
"_execute_command_using_ssh", _execute_command_using_ssh_patch
)
upload_file_patch = patch.object(SSH, "upload_file", ssh_upload_file)
upload_file_patch.start()
self.addCleanup(upload_file_patch.stop)

def ssh_delete_file(self, remote_path):
return MagicMock()

def tearDown(self):
# Remove the monkey patches
self.Server._revert_method("_get_ssh_client")
self.Server._revert_method("_execute_command_using_ssh")
super(TestTowerCommon, self).tearDown()
delete_file_patch = patch.object(SSH, "delete_file", ssh_delete_file)
delete_file_patch.start()
self.addCleanup(delete_file_patch.stop)

def add_to_group(self, user, group_refs):
"""Add user to groups
Expand Down
11 changes: 11 additions & 0 deletions cetmix_tower_server/tests/test_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -535,6 +535,17 @@ def test_user_access_rule(self):
# Ensure that user can access the command
command_name = test_command_1_as_bob.name
self.assertEqual(command_name, "Test command", msg="Must return 'Test command'")

# Check that user with "cetmix_tower_server.group_user" can execute ssh command
test_command.write(
{
"code": "ls -l",
}
)
self.server_test_1.with_user(self.user_bob).execute_command(
test_command,
)

# Add user to group_manager
self.add_to_group(self.user_bob, "cetmix_tower_server.group_manager")
# Create a new command with access_level 1
Expand Down
Loading

0 comments on commit 162d63f

Please sign in to comment.