From 96a8db34371c3ca8970a84a992e94fcaad2a3eae Mon Sep 17 00:00:00 2001 From: lukemartinlogan Date: Mon, 2 Dec 2024 22:10:35 -0600 Subject: [PATCH 01/12] Use pkill -9 instead of just pkill --- jarvis_util/shell/process.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/jarvis_util/shell/process.py b/jarvis_util/shell/process.py index 073f5d0..8a8962b 100644 --- a/jarvis_util/shell/process.py +++ b/jarvis_util/shell/process.py @@ -20,4 +20,4 @@ def __init__(self, cmd, exec_info, partial=True): :param exec_info: Info needed to execute the command """ partial_cmd = "-f" if partial else "" - super().__init__(f"pkill {partial_cmd} {cmd}", exec_info) + super().__init__(f"pkill -9 {partial_cmd} {cmd}", exec_info) From 078cc7f1c37fe9c2620c7e5a425139bf26533d72 Mon Sep 17 00:00:00 2001 From: lukemartinlogan Date: Mon, 9 Dec 2024 09:25:10 -0600 Subject: [PATCH 02/12] escape ; in ssh forward --- jarvis_util/shell/ssh_exec.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/jarvis_util/shell/ssh_exec.py b/jarvis_util/shell/ssh_exec.py index 36e6ec3..fde1b63 100644 --- a/jarvis_util/shell/ssh_exec.py +++ b/jarvis_util/shell/ssh_exec.py @@ -32,7 +32,7 @@ def __init__(self, cmd, exec_info): exec_info.mod(env=exec_info.basic_env, sudo=False)) else: - super().__init__(cmd, exec_info.mod(sudo=False)) + super().__init__(cmd.replace(';', '\;'), exec_info.mod(sudo=False)) def ssh_cmd(self, cmd): lines = ['ssh'] From ad2d8cf0ee24b27a46ea14a8850eba69d64b0f54 Mon Sep 17 00:00:00 2001 From: lukemartinlogan Date: Mon, 9 Dec 2024 09:30:40 -0600 Subject: [PATCH 03/12] Double-quotes --- jarvis_util/shell/ssh_exec.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/jarvis_util/shell/ssh_exec.py b/jarvis_util/shell/ssh_exec.py index fde1b63..f0f13e5 100644 --- a/jarvis_util/shell/ssh_exec.py +++ b/jarvis_util/shell/ssh_exec.py @@ -32,7 +32,7 @@ def __init__(self, cmd, exec_info): exec_info.mod(env=exec_info.basic_env, sudo=False)) else: - super().__init__(cmd.replace(';', '\;'), exec_info.mod(sudo=False)) + super().__init__(f'\"{cmd}\"', exec_info.mod(sudo=False)) def ssh_cmd(self, cmd): lines = ['ssh'] @@ -49,7 +49,7 @@ def ssh_cmd(self, cmd): cmd_lines = [] if self.ssh_env is not None: for key, val in self.ssh_env.items(): - cmd_lines.append(f'{key}=\"{val}\"') + cmd_lines.append(f'{key}=\'{val}\'') cmd_lines.append(cmd) env_cmd = ' '.join(cmd_lines) real_cmd = f'{ssh_cmd} {env_cmd}' From 1ada2de69e0e5ca0d32867706e2507cc42c22135 Mon Sep 17 00:00:00 2001 From: lukemartinlogan Date: Mon, 9 Dec 2024 09:32:01 -0600 Subject: [PATCH 04/12] Double-quotes --- jarvis_util/shell/ssh_exec.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/jarvis_util/shell/ssh_exec.py b/jarvis_util/shell/ssh_exec.py index f0f13e5..2b59a0e 100644 --- a/jarvis_util/shell/ssh_exec.py +++ b/jarvis_util/shell/ssh_exec.py @@ -32,7 +32,7 @@ def __init__(self, cmd, exec_info): exec_info.mod(env=exec_info.basic_env, sudo=False)) else: - super().__init__(f'\"{cmd}\"', exec_info.mod(sudo=False)) + super().__init__(cmd, exec_info.mod(sudo=False)) def ssh_cmd(self, cmd): lines = ['ssh'] @@ -52,7 +52,7 @@ def ssh_cmd(self, cmd): cmd_lines.append(f'{key}=\'{val}\'') cmd_lines.append(cmd) env_cmd = ' '.join(cmd_lines) - real_cmd = f'{ssh_cmd} {env_cmd}' + real_cmd = f'\"{ssh_cmd} {env_cmd}\"' return real_cmd From 7a28397c7150ee6431050761c94862e565925199 Mon Sep 17 00:00:00 2001 From: lukemartinlogan Date: Mon, 9 Dec 2024 09:33:46 -0600 Subject: [PATCH 05/12] Double-quotes --- jarvis_util/shell/ssh_exec.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/jarvis_util/shell/ssh_exec.py b/jarvis_util/shell/ssh_exec.py index 2b59a0e..1650e85 100644 --- a/jarvis_util/shell/ssh_exec.py +++ b/jarvis_util/shell/ssh_exec.py @@ -50,9 +50,9 @@ def ssh_cmd(self, cmd): if self.ssh_env is not None: for key, val in self.ssh_env.items(): cmd_lines.append(f'{key}=\'{val}\'') - cmd_lines.append(cmd) + cmd_lines.append(f'\"cmd\"') env_cmd = ' '.join(cmd_lines) - real_cmd = f'\"{ssh_cmd} {env_cmd}\"' + real_cmd = f'{ssh_cmd} {env_cmd}' return real_cmd From 4c6f8c02539108edd82c97ceae764e37fa2d4a76 Mon Sep 17 00:00:00 2001 From: lukemartinlogan Date: Mon, 9 Dec 2024 09:34:05 -0600 Subject: [PATCH 06/12] Double-quotes --- jarvis_util/shell/ssh_exec.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/jarvis_util/shell/ssh_exec.py b/jarvis_util/shell/ssh_exec.py index 1650e85..a9190a5 100644 --- a/jarvis_util/shell/ssh_exec.py +++ b/jarvis_util/shell/ssh_exec.py @@ -50,7 +50,7 @@ def ssh_cmd(self, cmd): if self.ssh_env is not None: for key, val in self.ssh_env.items(): cmd_lines.append(f'{key}=\'{val}\'') - cmd_lines.append(f'\"cmd\"') + cmd_lines.append(f'\"{cmd}\"') env_cmd = ' '.join(cmd_lines) real_cmd = f'{ssh_cmd} {env_cmd}' return real_cmd From 78a288233e48cd99edf20c950622b1ffb4f58e19 Mon Sep 17 00:00:00 2001 From: lukemartinlogan Date: Tue, 21 Jan 2025 22:48:40 -0600 Subject: [PATCH 07/12] Add chinettest --- jarvis_util/introspect/system_info.py | 335 ++++++++++++++++---------- jarvis_util/util/small_df.py | 2 +- 2 files changed, 206 insertions(+), 131 deletions(-) diff --git a/jarvis_util/introspect/system_info.py b/jarvis_util/introspect/system_info.py index b181037..57f01af 100644 --- a/jarvis_util/introspect/system_info.py +++ b/jarvis_util/introspect/system_info.py @@ -7,14 +7,18 @@ import re import platform from jarvis_util.shell.exec import Exec +from jarvis_util.shell.local_exec import LocalExec, LocalExecInfo from jarvis_util.util.size_conv import SizeConv from jarvis_util.serialize.yaml_file import YamlFile +from jarvis_util.shell.process import Kill import jarvis_util.util.small_df as sdf import json import yaml import shlex import ipaddress import copy +import time +import os # pylint: disable=C0121 @@ -214,7 +218,6 @@ def wait(self): dev['host'] = host total.append(dev) self.df = sdf.SmallDf(rows=total) - print(self.df) class Blkid(Exec): @@ -332,6 +335,65 @@ def wait(self): self.df.drop_duplicates() +class ChiNetPing(Exec): + """ + Determine whether a network functions across a set of hosts + """ + def __init__(self, provider, domain, port, mode, exec_info): + self.cmd = [ + 'chi_net_ping', + exec_info.hostfile.path if exec_info.hostfile.path else '\"\"', + provider, + domain, + str(port), + mode + ] + self.cmd = ' '.join(self.cmd) + if mode == 'server': + super().__init__(self.cmd, exec_info.mod(exec_async=True, hide_output=True)) + else: + super().__init__(self.cmd, LocalExecInfo(hide_output=True)) + + +class ChiNetPingTest: + """ + Determine whether a network functions across a set of hosts + """ + def __init__(self, provider, domain, port, exec_info): + self.server = ChiNetPing(provider, domain, port, "server", exec_info.mod(exec_async=True)) + time.sleep(1) + self.client = ChiNetPing(provider, domain, port, "client", exec_info) + Kill('chi_net_ping', exec_info) + self.exit_code = self.client.exit_code + + +class NetTest(FiInfo): + """ + Determine whether a set of networks function across a set of hosts. + """ + def __init__(self, port, exec_info): + super().__init__(exec_info) + self.working = [] + df = self.df[['provider', 'domain']].drop_duplicates() + print(f'About to test {len(df)} networks') + for net in df.rows: + provider = net['provider'] + domain = net['domain'] + print(f'Testing the network {provider}://{domain}/[{exec_info.hostfile.path}]:{port}') + # Test if the network works locally + ping = ChiNetPingTest(provider, domain, port, exec_info.mod(hostfile=None)) + net['shared'] = False + if ping.exit_code == 0: + self.working.append(net) + else: + continue + # Test if the network works across hosts + ping = ChiNetPingTest(provider, domain, port, exec_info) + if ping.exit_code == 0: + net['shared'] = True + self.working = sdf.SmallDf(self.working) + + class ResourceGraph: """ Stores helpful information about storage and networking info for machines. @@ -344,6 +406,7 @@ class ResourceGraph: model: the exact model of the device dev_type: type of device fs_type: the type of filesystem (e.g., ext4) + needs_root: whether the user needs sudo to access the device uuid: filesystem-levle uuid from the FS metadata avail: total number of bytes remaining shared: whether the this is a shared service or not @@ -362,10 +425,6 @@ class ResourceGraph: """ def __init__(self): - self.lsblk = None - self.blkid = None - self.list_fs = None - self.fi_info = None self.fs_columns = [ 'parent', 'device', 'mount', 'model', 'dev_type', 'fs_type', 'uuid', @@ -376,7 +435,6 @@ def __init__(self): 'speed', 'shared' ] self.create() - self.hosts = None self.path = None """ @@ -398,7 +456,8 @@ def build(self, exec_info, introspect=True): """ self.create() if introspect: - self._introspect(exec_info) + self.introspect_fs(exec_info) + self.introspect_net(exec_info, prune_nets=True) self.apply() return self @@ -408,7 +467,7 @@ def walkthrough_build(self, exec_info, introspect=True): def walkthrough_prune(self, exec_info): print('(2/4). Finding mount points common across machines') - mounts = self.find_storage(common=True, condense=True) + mounts = self.find_storage() self.print_df(mounts) # Add missing mountpoints x = self._ask_yes_no('2.(1/3). Are there any mount points missing ' @@ -498,41 +557,8 @@ def walkthrough_prune(self, exec_info): x = self._ask_yes_no('2.3.(3/3). Any more?', default='no') # Fill in missing network information - print('(3/4). Finding network info') - net_info = self.find_net_info(exec_info.hostfile) - fabrics = net_info['fabric'].unique().list() - hostname = socket.gethostname() - ip_address = socket.gethostbyname(hostname) - print(f'This IP addr of this node ({hostname}) is: {ip_address}') - print(f'We detect the following {len(fabrics)} fabrics: {fabrics}') - x = False - # pylint: disable=W0640 - for fabric in fabrics: - fabric_info = net_info[lambda r: r['fabric'] == fabric] - print(f'3.(1/4). Now modifying {fabric}:') - print(f'Providers: {fabric_info["provider"].unique().list()}') - print(f'Domains: {fabric_info["domain"].unique().list()}') - x = self._ask_yes_no('3.(2/4). Keep this fabric?', default='no') - if not x: - net_info = net_info[lambda r: r['fabric'] != fabric] - print() - continue - shared = self._ask_yes_no('3.(3/4). ' - 'Is this fabric shared across hosts?', - default='yes') - speed = self._ask_size('3.(4/4). ' - 'What is the speed of this network?') - fabric_info['speed'] = speed - fabric_info['shared'] = shared - print() - # pylint: enable=W0640 - self.net = net_info - x = self._ask_yes_no('(4/4). Are all hosts symmetrical? I.e., ' - 'the per-node resource graphs should all ' - 'be the same.', - default='yes') - if x: - self.make_common(exec_info.hostfile) + print('(3/3). Finding valid networks') + self.introspect_net(exec_info, prune_nets=True) self.apply() def _ask_string(self, msg, default=None): @@ -583,88 +609,96 @@ def _ask_size(self, msg): size = SizeConv.to_int(x) return size - def _introspect(self, exec_info): - """ - Introspect the cluster for resources. + """ + Introspect filesystems + """ - :param exec_info: Where to collect resource information - :return: None - """ - self.lsblk = PyLsblk(exec_info.mod(hide_output=True)) - self.blkid = Blkid(exec_info.mod(hide_output=True)) - self.list_fs = ListFses(exec_info.mod(hide_output=True)) - self.fi_info = FiInfo(exec_info.mod(hide_output=True)) - self.hosts = exec_info.hostfile.hosts - self.fs = sdf.merge([self.fs, self.lsblk.df, self.blkid.df], + def introspect_fs(self, exec_info, sudo=False): + lsblk = PyLsblk(exec_info.mod(hide_output=True)) + blkid = Blkid(exec_info.mod(hide_output=True)) + list_fs = ListFses(exec_info.mod(hide_output=True)) + fs = sdf.merge([lsblk.df, blkid.df], + on=['device', 'host'], + how='outer') + fs[:, 'shared'] = False + fs = sdf.merge([fs, list_fs.df], on=['device', 'host'], how='outer') - self.fs[:, 'shared'] = False - self.fs = sdf.merge([self.fs, self.list_fs.df], - on=['device', 'host'], - how='outer') - self.fs.drop_columns([ + fs = self._find_common_mounts(fs, exec_info) + fs = self._label_user_mounts(fs) + fs = fs.drop_columns([ 'used', 'use%', 'fs_mount', 'partuuid', 'fs_size', - 'partlabel', 'label']) - net_df = self.fi_info.df - net_df[:, 'speed'] = 0 - net_df.drop_columns(['version', 'type', 'protocol']) - net_df.drop_duplicates() - self.net = net_df - - def save(self, path): + 'partlabel', 'label', 'host']) + self.fs = fs + return self.fs + + def _find_common_mounts(self, fs, exec_info): """ - Save the resource graph YAML file - - :param path: the path to save the file - :return: None + Finds mount point points common across all hosts """ - graph = { - 'hosts': self.hosts, - 'fs': self.fs.rows, - 'net': self.net.rows, - } - YamlFile(path).save(graph) - self.path = path + io_groups = fs.groupby(['mount', 'device']) + common = [] + for name, group in io_groups.groups.items(): + if len(group) != len(exec_info.hostfile.hosts): + continue + common.append(group.rows[0]) + return sdf.SmallDf(common) - def load(self, path): + def _label_user_mounts(self, fs): """ - Load resource graph from storage. - - :param path: The path to the resource graph YAML file - :return: self + Try to find folders/directories the current user + can access without root priveleges in each mount """ - graph = YamlFile(path).load() - self.path = path - self.hosts = graph['hosts'] - self.fs = sdf.SmallDf(graph['fs'], columns=self.fs_columns) - self.net = sdf.SmallDf(graph['net'], columns=self.net_columns) - self.apply() - return self - - def _derive_storage_cols(self): - df = self.fs - if df is None or len(df) == 0: - return - df['mount'].fillna('') - df['shared'].fillna(True) - df['tran'].fillna('') - df['size'].fillna(0) - df['size'].apply(lambda r, c: SizeConv.to_int(r[c])) - noavail = df[lambda r: r['avail'] == 0 or r['avail'] is None, :] - noavail['avail'] = noavail['size'] - df['avail'].apply(lambda r, c: SizeConv.to_int(r[c])) + for dev in fs.rows: + dev['needs_root'] = True + if dev['mount'] is None: + continue + if not dev['mount'].startswith('/'): + continue + dev['mount'] = self._try_user_access_paths(dev, fs) + return sdf.SmallDf(fs.rows) + + def _try_user_access_paths(self, dev, fs): + username = os.getenv('USER') or os.getenv('USERNAME') + paths = [ + dev['mount'], + os.path.join(dev["mount"], username), + os.path.join(dev["mount"], 'users', username), + os.path.join(dev["mount"], 'home', username), + ] + for path in paths: + if self._try_user_access(fs, path, path == dev['mount']): + dev['needs_root'] = True + return path + dev['needs_root'] = False + return dev['mount'] + + def _try_user_access(self, fs, mount, known_mount=False): + try: + if not known_mount and self._check_if_mounted(fs, mount): + return False + if os.access(mount, os.R_OK) and os.access(mount, os.W_OK): + return True + except (PermissionError, OSError): + return False - def _derive_net_cols(self): - self.net['domain'].fillna('') + def _check_if_mounted(self, fs, mount): + return len(fs[lambda r: r['mount'] == mount]) > 0 - def set_hosts(self, hosts): - """ - Set the set of hosts this resource graph covers + """ + Introspect networks + """ - :param hosts: Hostfile() - :return: None - """ - self.hosts = hosts.hosts_ip + def introspect_net(self, exec_info, prune_nets=False, prune_port=4192): + if not prune_nets: + fi_info = FiInfo(exec_info.mod(hide_output=True)) + else: + fi_info = NetTest(prune_port, exec_info.mod(hide_output=True)) + net_df = fi_info.df + net_df[:, 'speed'] = 0 + net_df.drop_columns(['version', 'type', 'protocol']) + net_df.drop_duplicates() + self.net = net_df """ Update the resource graph @@ -757,6 +791,50 @@ def apply(self): self._derive_net_cols() self._derive_storage_cols() + def save(self, path): + """ + Save the resource graph YAML file + + :param path: the path to save the file + :return: None + """ + graph = { + 'fs': self.fs.rows, + 'net': self.net.rows, + } + YamlFile(path).save(graph) + self.path = path + + def load(self, path): + """ + Load resource graph from storage. + + :param path: The path to the resource graph YAML file + :return: self + """ + graph = YamlFile(path).load() + self.path = path + self.fs = sdf.SmallDf(graph['fs'], columns=self.fs_columns) + self.net = sdf.SmallDf(graph['net'], columns=self.net_columns) + self.apply() + return self + + def _derive_storage_cols(self): + df = self.fs + if df is None or len(df) == 0: + return + df['mount'].fillna('') + df['shared'].fillna(True) + df['tran'].fillna('') + df['size'].fillna(0) + df['size'].apply(lambda r, c: SizeConv.to_int(r[c])) + noavail = df[lambda r: r['avail'] == 0 or r['avail'] is None, :] + noavail['avail'] = noavail['size'] + df['avail'].apply(lambda r, c: SizeConv.to_int(r[c])) + + def _derive_net_cols(self): + self.net['domain'].fillna('') + """ Query the resource graph """ @@ -770,11 +848,19 @@ def find_shared_storage(self): df = self.fs return df[lambda r: r['shared'] == True] + def find_user_storage(self): + """ + Find the set of user-accessible storage services + + :return: Dataframe + """ + df = self.fs + return df[lambda r: r['needs_root'] == False] + def find_storage(self, dev_types=None, is_mounted=True, - common=False, - condense=False, + needs_root = None, count_per_node=None, count_per_dev=None, min_cap=None, @@ -788,9 +874,8 @@ def find_storage(self, :param dev_types: Search for devices of type in order. Either a list or a string. :param is_mounted: Search only for mounted devices - :param common: Remove mount points that are not common across all hosts - :param condense: Used in conjunction with common. Will remove the 'host' column and will only contain one entry per mount point. + :param needs_root: Search for devices that do or don't need root access. :param count_per_node: Choose only a subset of devices matching query :param count_per_dev: Choose only a subset of devices matching query :param min_cap: Remove devices with too little overall capacity @@ -811,15 +896,14 @@ def find_storage(self, mount_res = [mount_res] df = df[lambda r: any(re.match(reg, str(r['mount'])) for reg in mount_res)] + # Filter devices by whether or not root is needed + if needs_root is not None: + df = df[lambda r: r['needs_root'] == needs_root] # Find devices of a particular type if dev_types is not None: if not isinstance(dev_types, (list, tuple, set)): dev_types = [dev_types] df = df[lambda r: str(r['dev_type']) in dev_types] - # Get the set of mounts common between all hosts - if common: - df = df.groupby(['mount']).filter_groups( - lambda x: len(x) == len(self.hosts)).reset_index() # Remove storage with too little capacity if min_cap is not None: df = df[lambda r: r['size'] >= min_cap] @@ -833,8 +917,6 @@ def find_storage(self, # Take a certain number of matched devices per-host if count_per_node is not None: df = df.groupby('host').head(count_per_node).reset_index() - if common and condense: - df = df.groupby(['mount']).first().reset_index() # df = df.drop_columns('host') if shared is not None: df = df[lambda r: r['shared'] == shared] @@ -857,7 +939,6 @@ def find_net_info(self, hosts=None, strip_ips=False, providers=None, - condense=False, shared=None, df=None): """ @@ -867,7 +948,6 @@ def find_net_info(self, all hosts to find network information for :param strip_ips: remove IPs that are not compatible with the hostfile :param providers: The network protocols to search for. - :param condense: Only retain information for a single host :param df: The df to use for this query :param shared: Filter out local networks :return: Dataframe @@ -879,11 +959,6 @@ def find_net_info(self, if strip_ips: ips = [ipaddress.ip_address(ip) for ip in hosts.hosts_ip] df = df[lambda r: self._subnet_matches_hosts(r['fabric'], ips)] - # Filter out protocols which are not common between these hosts - if condense: - grp = df.groupby(['provider', 'domain']).filter_groups( - lambda x: len(x) >= len(hosts)) - df = grp.first().reset_index() # Choose only a subset of providers if providers is not None: if not isinstance(providers, (list, set)): diff --git a/jarvis_util/util/small_df.py b/jarvis_util/util/small_df.py index b1738e3..8b2e97f 100644 --- a/jarvis_util/util/small_df.py +++ b/jarvis_util/util/small_df.py @@ -60,7 +60,7 @@ def _drop_duplicates(self, rows): return self._mutable_dict(dedup) def _fixed_dict(self, rows): - return tuple((tuple(row.items()) for row in rows)) + return tuple(tuple((key, row[key]) for key in self.columns) for row in rows) def _mutable_dict(self, rows): # return [{key:val for key, val in row} for row in rows] From e8aab7c1f8753e1dba981e2c7eede2c923eb6601 Mon Sep 17 00:00:00 2001 From: lukemartinlogan Date: Tue, 21 Jan 2025 23:06:35 -0600 Subject: [PATCH 08/12] Also add fabric as a test option --- jarvis_util/introspect/system_info.py | 26 +++++++++----------------- 1 file changed, 9 insertions(+), 17 deletions(-) diff --git a/jarvis_util/introspect/system_info.py b/jarvis_util/introspect/system_info.py index 57f01af..2181382 100644 --- a/jarvis_util/introspect/system_info.py +++ b/jarvis_util/introspect/system_info.py @@ -374,7 +374,7 @@ class NetTest(FiInfo): def __init__(self, port, exec_info): super().__init__(exec_info) self.working = [] - df = self.df[['provider', 'domain']].drop_duplicates() + df = self.df[['provider', 'domain', 'fabric']].drop_duplicates() print(f'About to test {len(df)} networks') for net in df.rows: provider = net['provider'] @@ -391,7 +391,7 @@ def __init__(self, port, exec_info): ping = ChiNetPingTest(provider, domain, port, exec_info) if ping.exit_code == 0: net['shared'] = True - self.working = sdf.SmallDf(self.working) + self.df = sdf.SmallDf(self.working) class ResourceGraph: @@ -428,10 +428,10 @@ def __init__(self): self.fs_columns = [ 'parent', 'device', 'mount', 'model', 'dev_type', 'fs_type', 'uuid', - 'avail', 'shared', 'host', + 'avail', 'shared', 'needs_root' ] self.net_columns = [ - 'provider', 'fabric', 'domain', 'host', + 'provider', 'fabric', 'domain', 'speed', 'shared' ] self.create() @@ -668,13 +668,15 @@ def _try_user_access_paths(self, dev, fs): ] for path in paths: if self._try_user_access(fs, path, path == dev['mount']): - dev['needs_root'] = True + dev['needs_root'] = False return path - dev['needs_root'] = False + dev['needs_root'] = True return dev['mount'] def _try_user_access(self, fs, mount, known_mount=False): try: + if mount.startswith('/boot'): + print(mount) if not known_mount and self._check_if_mounted(fs, mount): return False if os.access(mount, os.R_OK) and os.access(mount, os.W_OK): @@ -975,17 +977,7 @@ def find_net_info(self, def print_df(self, df): if 'device' in df.columns: - if 'host' in df.columns: - col = ['host', 'mount', 'device', 'dev_type', 'shared', - 'avail', 'tran', 'rota', 'fs_type'] - df = df[col] - df = df.sort_values('host') - print(df.to_string()) - else: - col = ['device', 'mount', 'dev_type', 'shared', - 'avail', 'tran', 'rota', 'fs_type'] - df = df[col] - print(df.to_string()) + print(df.sort_values('mount').to_string()) else: print(df.sort_values('provider').to_string()) From f7e75fe15eaf38c9095a0d2d246c9dd1a1f402ed Mon Sep 17 00:00:00 2001 From: lukemartinlogan Date: Tue, 21 Jan 2025 23:47:43 -0600 Subject: [PATCH 09/12] Add optional exclusion list to nettest --- jarvis_util/introspect/system_info.py | 166 +++----------------------- jarvis_util/util/small_df.py | 9 ++ 2 files changed, 24 insertions(+), 151 deletions(-) diff --git a/jarvis_util/introspect/system_info.py b/jarvis_util/introspect/system_info.py index 2181382..0c773f7 100644 --- a/jarvis_util/introspect/system_info.py +++ b/jarvis_util/introspect/system_info.py @@ -371,15 +371,19 @@ class NetTest(FiInfo): """ Determine whether a set of networks function across a set of hosts. """ - def __init__(self, port, exec_info): + def __init__(self, port, exec_info, exclusions=None): super().__init__(exec_info) self.working = [] df = self.df[['provider', 'domain', 'fabric']].drop_duplicates() + if exclusions: + exclusions = exclusions[['provider', 'domain', 'fabric']].drop_duplicates() + df = df[lambda r: r not in exclusions] print(f'About to test {len(df)} networks') for net in df.rows: provider = net['provider'] domain = net['domain'] - print(f'Testing the network {provider}://{domain}/[{exec_info.hostfile.path}]:{port}') + fabric = net['fabric'] + print(f'Testing the network {provider}://{domain}/[{fabric}]:{port}') # Test if the network works locally ping = ChiNetPingTest(provider, domain, port, exec_info.mod(hostfile=None)) net['shared'] = False @@ -460,154 +464,11 @@ def build(self, exec_info, introspect=True): self.introspect_net(exec_info, prune_nets=True) self.apply() return self - - def walkthrough_build(self, exec_info, introspect=True): - self.build(exec_info, introspect) - self.walkthrough_prune(exec_info) - - def walkthrough_prune(self, exec_info): - print('(2/4). Finding mount points common across machines') - mounts = self.find_storage() - self.print_df(mounts) - # Add missing mountpoints - x = self._ask_yes_no('2.(1/3). Are there any mount points missing ' - 'you would like to add?', - default='no') - new_devs = [] - while x: - mount = self._ask_string('2.1.(1/6). Mount point') - mount = mount.replace(r'\$', '$') - dev_type = self._ask_choices('2.1.(2/6). What transport?', - choices=['hdd', 'ssd', 'nvme', 'pmem']) - shared = self._ask_yes_no('2.1.(3/6). Is this device shared? ' - 'I.e., a PFS?') - avail = self._ask_size('2.1.(4/6). How much capacity are you ' - 'willing to use?') - y = self._ask_yes_no('2.1.(5/6). Are you sure this is accurate?', - default='yes') - if not y: - continue - new_devs.append({ - 'mount': mount, - 'dev_type': dev_type, - 'shared': shared, - 'avail': avail, - 'size': avail, - }) - x = self._ask_yes_no('2.1.(6/6). Are there any other ' - 'devices you would like to add?', - default='no') - if x is None: - x = False - self.add_storage(exec_info.hostfile, new_devs) - # Correct discovered mount points - x = self._ask_yes_no('2.(2/3). Would you like to correct ' - 'any mountpoints?', - default='no') - while x: - regex = self._ask_re('2.2.(1/3). Enter a regex of mount ' - 'points to select', - default='.*').strip() - if regex is None: - regex = '.*' - if regex.endswith('*'): - regex = f'^{regex}' - else: - regex = f'^{regex}$' - matches = mounts[lambda r: re.match(regex, r['mount']), 'mount'] - print(matches.to_string()) - y = self._ask_yes_no('Is this correct?', default='yes') - if not y: - continue - suffix = self._ask_string('2.2.(2/3). Enter a suffix to ' - 'append to these paths.', - default='${USER}') - y = self._ask_yes_no('Are you sure this is accurate?', - default='yes') - if not y: - continue - suffix = suffix.replace(r'\$', '$') - self.add_suffix(regex, mount_suffix=suffix) - x = self._ask_yes_no('2.2.(3/3). Do you want to select more ' - 'mount points?', - default='no') - # Eliminate unneded mount points - x = self._ask_yes_no( - '2.(3/3). Would you like to remove any mount points?', - default='no') - mounts = self.fs['mount'].unique().list() - print(f'Mount points: {mounts}') - while x: - regex = self._ask_re('2.3.(1/3). Enter a regex of mount ' - 'points to remove.').strip() - if regex is None: - regex = '.*' - if regex.endswith('*'): - regex = f'^{regex}' - else: - regex = f'^{regex}$' - matches = self.fs[lambda r: re.match(regex, r['mount']), 'mount'] - print(matches.to_string()) - y = self._ask_yes_no('2.3.(2/3). Is this correct?', default='yes') - if not y: - continue - self.fs = self.fs[lambda r: not re.match(regex, r['mount'])] - mounts = self.fs['mount'].unique().list() - print(f'Mount points: {mounts}') - x = self._ask_yes_no('2.3.(3/3). Any more?', default='no') - - # Fill in missing network information - print('(3/3). Finding valid networks') + + def modify(self, exec_info): + self.introspect_fs(exec_info) self.introspect_net(exec_info, prune_nets=True) - self.apply() - - def _ask_string(self, msg, default=None): - if default is None: - x = input(f'{msg}: ') - else: - x = input(f'{msg} (Default: {default}): ') - if len(x) == 0: - x = default - return x - - def _ask_re(self, msg, default=None): - if default is not None: - msg = f'{msg} (Default: {default})' - x = input(f'{msg}: ') - if len(x) == 0: - x = default - if x is None: - x = '' - return x - - def _ask_yes_no(self, msg, default=None): - while True: - msg = f'{msg} (yes/no)' - if default is not None: - msg = f'{msg} (Default: {default})' - x = input(f'{msg}: ') - if x == '': - x = default - if x == 'yes': - return True - elif x == 'no': - return False - else: - print(f'{x} is not either yes or no') - - def _ask_choices(self, msg, choices): - choices_str = '/'.join(choices) - while True: - x = input(f'{msg} ({choices_str}): ') - if x in choices: - return x - else: - print(f'{x} is not a valid choice') - - def _ask_size(self, msg): - x = input(f'{msg} (kK,mM,gG,tT,pP): ') - size = SizeConv.to_int(x) - return size + pass """ Introspect filesystems @@ -695,12 +556,15 @@ def introspect_net(self, exec_info, prune_nets=False, prune_port=4192): if not prune_nets: fi_info = FiInfo(exec_info.mod(hide_output=True)) else: - fi_info = NetTest(prune_port, exec_info.mod(hide_output=True)) + fi_info = NetTest(prune_port, exec_info.mod(hide_output=True), exclusions=self.net) net_df = fi_info.df net_df[:, 'speed'] = 0 net_df.drop_columns(['version', 'type', 'protocol']) net_df.drop_duplicates() - self.net = net_df + if self.net: + self.net = sdf.concat([self.net, net_df]) + else: + self.net = net_df """ Update the resource graph diff --git a/jarvis_util/util/small_df.py b/jarvis_util/util/small_df.py index 8b2e97f..ffc6185 100644 --- a/jarvis_util/util/small_df.py +++ b/jarvis_util/util/small_df.py @@ -370,6 +370,15 @@ def _opeq(self, other, func): for col in df.columns: row[col] = orow[col] return self + + def __contains__(self, row): + """ + Check if a row is in the dataframe + + :param row: The row to check + :return: bool + """ + return row in self.rows def __add__(self, other): return self._op(other, From e8b60810f49f67bd62207d973804e7f2327d7bf0 Mon Sep 17 00:00:00 2001 From: lukemartinlogan Date: Wed, 22 Jan 2025 01:11:28 -0600 Subject: [PATCH 10/12] Remove ping from kill and make time variable --- jarvis_util/introspect/system_info.py | 61 ++++++++++++++++++--------- 1 file changed, 41 insertions(+), 20 deletions(-) diff --git a/jarvis_util/introspect/system_info.py b/jarvis_util/introspect/system_info.py index 0c773f7..2f56b15 100644 --- a/jarvis_util/introspect/system_info.py +++ b/jarvis_util/introspect/system_info.py @@ -12,6 +12,7 @@ from jarvis_util.serialize.yaml_file import YamlFile from jarvis_util.shell.process import Kill import jarvis_util.util.small_df as sdf +import threading import json import yaml import shlex @@ -359,19 +360,19 @@ class ChiNetPingTest: """ Determine whether a network functions across a set of hosts """ - def __init__(self, provider, domain, port, exec_info): + def __init__(self, provider, domain, port, exec_info, sleep=10): self.server = ChiNetPing(provider, domain, port, "server", exec_info.mod(exec_async=True)) - time.sleep(1) + time.sleep(sleep) self.client = ChiNetPing(provider, domain, port, "client", exec_info) - Kill('chi_net_ping', exec_info) self.exit_code = self.client.exit_code + # Kill('chi_net_ping', exec_info) class NetTest(FiInfo): """ Determine whether a set of networks function across a set of hosts. """ - def __init__(self, port, exec_info, exclusions=None): + def __init__(self, port, exec_info, exclusions=None, base_port=6040): super().__init__(exec_info) self.working = [] df = self.df[['provider', 'domain', 'fabric']].drop_duplicates() @@ -379,23 +380,43 @@ def __init__(self, port, exec_info, exclusions=None): exclusions = exclusions[['provider', 'domain', 'fabric']].drop_duplicates() df = df[lambda r: r not in exclusions] print(f'About to test {len(df)} networks') - for net in df.rows: - provider = net['provider'] - domain = net['domain'] - fabric = net['fabric'] - print(f'Testing the network {provider}://{domain}/[{fabric}]:{port}') - # Test if the network works locally - ping = ChiNetPingTest(provider, domain, port, exec_info.mod(hostfile=None)) - net['shared'] = False - if ping.exit_code == 0: - self.working.append(net) - else: - continue - # Test if the network works across hosts - ping = ChiNetPingTest(provider, domain, port, exec_info) - if ping.exit_code == 0: - net['shared'] = True + port = base_port + threads = [] + self.results = [None] * len(df) + for idx, net in enumerate(df.rows): + # Start a new thread for each network test + thread = threading.Thread(target=self._async_test, args=(idx, net, port, exec_info)) + threads.append(thread) + thread.start() + port += 1 + # thread.join() + + # Wait for all threads to complete + for idx, thread in enumerate(threads): + thread.join() + result = self.results[idx] + if result is not None: + self.working.append(result) + self.df = sdf.SmallDf(self.working) + Kill('chi_net_ping', exec_info) + + def _async_test(self, idx, net, port, exec_info): + provider = net['provider'] + domain = net['domain'] + fabric = net['fabric'] + # Test if the network works locally + ping = ChiNetPingTest(provider, domain, port, exec_info.mod(hostfile=None)) + net['shared'] = False + if ping.exit_code != 0: + print(f'Testing the network {provider}://{domain}/[{fabric}]:{port}: FAILED {ping.exit_code}') + return + self.results[idx] = net + # Test if the network works across hosts + ping = ChiNetPingTest(provider, domain, port, exec_info) + if ping.exit_code == 0: + net['shared'] = True + print(f'Testing the network {provider}://{domain}/[{fabric}]:{port}: SUCCESS') class ResourceGraph: From 9df61793a19591c763ccb86f96fbd89376b83fad Mon Sep 17 00:00:00 2001 From: lukemartinlogan Date: Wed, 22 Jan 2025 01:14:18 -0600 Subject: [PATCH 11/12] Make sleep more configurable in nettest --- jarvis_util/introspect/system_info.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/jarvis_util/introspect/system_info.py b/jarvis_util/introspect/system_info.py index 2f56b15..65c8843 100644 --- a/jarvis_util/introspect/system_info.py +++ b/jarvis_util/introspect/system_info.py @@ -372,7 +372,7 @@ class NetTest(FiInfo): """ Determine whether a set of networks function across a set of hosts. """ - def __init__(self, port, exec_info, exclusions=None, base_port=6040): + def __init__(self, port, exec_info, exclusions=None, base_port=6040, sleep=10): super().__init__(exec_info) self.working = [] df = self.df[['provider', 'domain', 'fabric']].drop_duplicates() @@ -385,7 +385,7 @@ def __init__(self, port, exec_info, exclusions=None, base_port=6040): self.results = [None] * len(df) for idx, net in enumerate(df.rows): # Start a new thread for each network test - thread = threading.Thread(target=self._async_test, args=(idx, net, port, exec_info)) + thread = threading.Thread(target=self._async_test, args=(idx, net, port, exec_info, sleep)) threads.append(thread) thread.start() port += 1 @@ -401,19 +401,19 @@ def __init__(self, port, exec_info, exclusions=None, base_port=6040): self.df = sdf.SmallDf(self.working) Kill('chi_net_ping', exec_info) - def _async_test(self, idx, net, port, exec_info): + def _async_test(self, idx, net, port, exec_info, sleep): provider = net['provider'] domain = net['domain'] fabric = net['fabric'] # Test if the network works locally - ping = ChiNetPingTest(provider, domain, port, exec_info.mod(hostfile=None)) + ping = ChiNetPingTest(provider, domain, port, exec_info.mod(hostfile=None), 2) net['shared'] = False if ping.exit_code != 0: print(f'Testing the network {provider}://{domain}/[{fabric}]:{port}: FAILED {ping.exit_code}') return self.results[idx] = net # Test if the network works across hosts - ping = ChiNetPingTest(provider, domain, port, exec_info) + ping = ChiNetPingTest(provider, domain, port, exec_info, sleep) if ping.exit_code == 0: net['shared'] = True print(f'Testing the network {provider}://{domain}/[{fabric}]:{port}: SUCCESS') From 6b64f23e5b6f49d400c69e5311ecf8f72f1f555b Mon Sep 17 00:00:00 2001 From: lukemartinlogan Date: Wed, 22 Jan 2025 01:20:31 -0600 Subject: [PATCH 12/12] Use net_sleep as variable name --- jarvis_util/introspect/system_info.py | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/jarvis_util/introspect/system_info.py b/jarvis_util/introspect/system_info.py index 65c8843..1eead80 100644 --- a/jarvis_util/introspect/system_info.py +++ b/jarvis_util/introspect/system_info.py @@ -360,9 +360,9 @@ class ChiNetPingTest: """ Determine whether a network functions across a set of hosts """ - def __init__(self, provider, domain, port, exec_info, sleep=10): + def __init__(self, provider, domain, port, exec_info, net_sleep=10): self.server = ChiNetPing(provider, domain, port, "server", exec_info.mod(exec_async=True)) - time.sleep(sleep) + time.sleep(net_sleep) self.client = ChiNetPing(provider, domain, port, "client", exec_info) self.exit_code = self.client.exit_code # Kill('chi_net_ping', exec_info) @@ -372,7 +372,7 @@ class NetTest(FiInfo): """ Determine whether a set of networks function across a set of hosts. """ - def __init__(self, port, exec_info, exclusions=None, base_port=6040, sleep=10): + def __init__(self, port, exec_info, exclusions=None, base_port=6040, net_sleep=10): super().__init__(exec_info) self.working = [] df = self.df[['provider', 'domain', 'fabric']].drop_duplicates() @@ -385,7 +385,7 @@ def __init__(self, port, exec_info, exclusions=None, base_port=6040, sleep=10): self.results = [None] * len(df) for idx, net in enumerate(df.rows): # Start a new thread for each network test - thread = threading.Thread(target=self._async_test, args=(idx, net, port, exec_info, sleep)) + thread = threading.Thread(target=self._async_test, args=(idx, net, port, exec_info, net_sleep)) threads.append(thread) thread.start() port += 1 @@ -401,7 +401,7 @@ def __init__(self, port, exec_info, exclusions=None, base_port=6040, sleep=10): self.df = sdf.SmallDf(self.working) Kill('chi_net_ping', exec_info) - def _async_test(self, idx, net, port, exec_info, sleep): + def _async_test(self, idx, net, port, exec_info, net_sleep): provider = net['provider'] domain = net['domain'] fabric = net['fabric'] @@ -413,7 +413,7 @@ def _async_test(self, idx, net, port, exec_info, sleep): return self.results[idx] = net # Test if the network works across hosts - ping = ChiNetPingTest(provider, domain, port, exec_info, sleep) + ping = ChiNetPingTest(provider, domain, port, exec_info, net_sleep) if ping.exit_code == 0: net['shared'] = True print(f'Testing the network {provider}://{domain}/[{fabric}]:{port}: SUCCESS') @@ -470,7 +470,7 @@ def create(self): self.fs = sdf.SmallDf(columns=self.fs_columns) self.net = sdf.SmallDf(columns=self.net_columns) - def build(self, exec_info, introspect=True): + def build(self, exec_info, introspect=True, net_sleep=10): """ Build a resource graph. @@ -482,7 +482,7 @@ def build(self, exec_info, introspect=True): self.create() if introspect: self.introspect_fs(exec_info) - self.introspect_net(exec_info, prune_nets=True) + self.introspect_net(exec_info, prune_nets=True, net_sleep=net_sleep) self.apply() return self @@ -573,11 +573,11 @@ def _check_if_mounted(self, fs, mount): Introspect networks """ - def introspect_net(self, exec_info, prune_nets=False, prune_port=4192): + def introspect_net(self, exec_info, prune_nets=False, prune_port=4192, net_sleep=10): if not prune_nets: fi_info = FiInfo(exec_info.mod(hide_output=True)) else: - fi_info = NetTest(prune_port, exec_info.mod(hide_output=True), exclusions=self.net) + fi_info = NetTest(prune_port, exec_info.mod(hide_output=True), exclusions=self.net, net_sleep=net_sleep) net_df = fi_info.df net_df[:, 'speed'] = 0 net_df.drop_columns(['version', 'type', 'protocol'])