From c5e605d06d26c0a0614d719af75686e040f2d0b0 Mon Sep 17 00:00:00 2001 From: Nathan Figueroa Date: Thu, 26 Dec 2024 16:11:31 -0500 Subject: [PATCH] refactor: Replace 'screen' use with Docker. WIP --- configs/example.yaml | 40 ++- phyto-arm | 397 +++++++++++++------------ src/phyto_arm/launch/arm_chanos.launch | 2 +- src/phyto_arm/launch/arm_ifcb.launch | 5 +- src/phyto_arm/launch/main.launch | 5 +- src/phyto_arm/launch/rosbag.launch | 6 +- src/phyto_arm/src/web_node.py | 2 +- 7 files changed, 252 insertions(+), 205 deletions(-) diff --git a/configs/example.yaml b/configs/example.yaml index 989e965..0678e01 100644 --- a/configs/example.yaml +++ b/configs/example.yaml @@ -1,12 +1,36 @@ name: Example - -launch_args: - log_dir: /mnt/data/roslogs/ - rosbag_prefix: /mnt/data/rosbags/phyto-arm - classifier: false - ifcb_winch: false - chanos_winch: false +docker_image: whoi/phyto-arm:latest + +processes: + main: + enabled: true + volumes: + log_dir: /mnt/data/roslogs/ + rosbag_dir: /mnt/data/rosbags/ + launch_args: + rosbag_prefix: phyto-arm + classifier: false + tcp_ports: + bridge_node: 9090 + web_node: 8098 + arm_ifcb: + enabled: true + launch_args: + ifcb_winch: false + devices: + ctd_path: /dev/ttyS1 + volumes: + routines_dir: /home/ifcb/IFCBacquire/Host/Routines + data_dir: /mnt/data/ifcbdata + arm_chanos: + enabled: false + launch_args: + chanos_winch: false + devices: + ctd_path: /dev/ttyS2 + udp_ports: + rbr_port: 12345 alerts: @@ -28,10 +52,8 @@ ifcb: address: "172.17.0.1" port: 8092 serial: "111-111-111" - routines_dir: "/routines" # This path is interpreted by IFCBacquire, which may be in another # container or namespace. - data_dir: "/mnt/data/ifcbdata" arm_ifcb: diff --git a/phyto-arm b/phyto-arm index d6eee92..ec9913d 100755 --- a/phyto-arm +++ b/phyto-arm @@ -1,233 +1,250 @@ #!/usr/bin/env python3 -''' -This script is used to launch the PhytO-ARM software. - -It reads the system configuration from the provided config file and starts the -ROS nodes with the appropriate settings. - -The nodes are mostly started by roslaunch, with the notable exception of -`rosbag record`, which we want to keep recording at all costs. -''' import argparse import atexit import json import os -import shlex -import subprocess import sys import urllib.request +from pathlib import Path +import docker import yaml from scripts.config_validation import validate_config -# Load the virtual environment and Catkin workspace and return the resulting -# environment -def prep_environment(): - # Due to a long-standing bug in the setup.sh script, we must provide to the - # devel/ directory in the _CATKIN_SETUP_DIR environment variable. - # - # This also needs to be an absolute path so that Python can locate packages. - # - # Ref: https://github.com/catkin/catkin_tools/issues/376 - parent_dir = os.path.dirname(__file__) - setup_dir = os.path.abspath(os.path.join(parent_dir, 'devel')) - env = { - '_CATKIN_SETUP_DIR': setup_dir +def create_network(client): + """Create or get the Docker network for PhytO-ARM containers""" + try: + return client.networks.get("phyto-arm-net") + except docker.errors.NotFound: + return client.networks.create("phyto-arm-net", driver="bridge") + +def start_container(client, image_name, network_name, config_path: str, name: str, process_config: dict) -> docker.models.containers.Container: + container_name = f"phyto-arm-{name}" + + # Base volumes that all containers need + volumes = { + str(Path(config_path).absolute()): { + "bind": "/app/mounted_config.yaml", + "mode": "ro" + }, + str(Path("configs").absolute()): { + "bind": "/app/configs", + "mode": "ro" + } + } + + # Convert launch args to roslaunch arguments + launch_args = [] + for key, value in process_config.get('launch_args', {}).items(): + launch_args.append(f"{key}:={value}") + launch_args_str = " ".join(launch_args) + + # Convert paths to volumes and add to launch args + for vol_name, host_path in process_config.get('volumes', {}).items(): + if vol_name in launch_args: + raise ValueError(f"Volume name cannot be the same as a launch arg: {vol_name}") + + # Ensure the host directory exists + Path(host_path).mkdir(parents=True, exist_ok=True) + + # Mount within '/app/volumes' in the container + mounted_path = f"/app/volumes/{vol_name}" + volumes[host_path] = {"bind": mounted_path, "mode": "rw"} + + # Add the volume to the launch args + launch_args.append(f"{vol_name}:={mounted_path}") + + # Likewise, check for devices, mount them, and add to launch args + devices = [] + for device_name, host_path in process_config.get('devices', {}).items(): + if device_name in launch_args: + raise ValueError(f"Device name cannot be the same as a launch arg: {device_name}") + # Maintain the host path + devices.append(f"{host_path}:{host_path}:r") + launch_args.append(f"{device_name}:={host_path}") + + # And add udp ports + ports = [] + for port_name, port_value in process_config.get('udp_ports', {}).items(): + if port_name in launch_args: + raise ValueError(f"UDP port name cannot be the same as a launch arg: {port_name}") + ports.append(f"{port_name}:{port_value}/udp") + launch_args.append(f"{port_name}:={port_value}") + + # Add tcp ports + for port_name, port_value in process_config.get('tcp_ports', {}).items(): + if port_name in launch_args: + raise ValueError(f"TCP port name cannot be the same as a launch arg: {port_name}") + ports.append(f"{port_name}:{port_value}/tcp") + launch_args.append(f"{port_name}:={port_value}") + + # Build the roslaunch command based on container type + if name == "main": + command = f"""bash -c " + source devel/setup.bash && + roscore & + sleep 5 && + roslaunch --wait phyto_arm rosbag.launch config_file:=/app/mounted_config.yaml {launch_args_str} && + roslaunch --wait phyto_arm main.launch config_file:=/app/mounted_config.yaml {launch_args_str} + " """ + else: + command = f"""bash -c " + source devel/setup.bash && + roslaunch --wait phyto_arm {name}.launch config_file:=/app/mounted_config.yaml {launch_args_str} + " """ + + # Base container configuration + container_config = { + "image": image_name, + "name": container_name, + "command": command, + "detach": True, + "remove": True, + "volumes": volumes, + "network": network_name, + "devices": devices, + "ports": ports, + "environment": { + "ROS_MASTER_URI": "http://phyto-arm-main:11311" # Point to main container + } } - # Build command to load the catkin workspace (and optionally a virtual - # environment) and dump the environment. - command = f'. {shlex.quote(setup_dir)}/setup.sh && env' - - if os.getenv('NO_VIRTUALENV') is None: - command = f'. {shlex.quote(parent_dir)}/.venv/bin/activate && ' + \ - command - - # Dump the environment and parse the output - env_out = subprocess.check_output(command, shell=True, env=env) - for line in env_out.rstrip().split(b'\n'): - var, _, value = line.partition(b'=') - env[var] = value - - return env - - -# Prepare a command and environment for roslaunch -def prep_roslaunch(config, env, package, launchfile): - # Build the command-line arguments for roslaunch - rl_args = [ - 'roslaunch', - '--wait', # because we can start the ROS master ourselves - '--required', # stop if any process fails - '--skip-log-check', - package, launchfile - ] - rl_args.append(f'config_file:={os.path.abspath(args.config)}') - - for launch_arg, value in config.get('launch_args', {}).items(): - rl_args.append(f'{launch_arg}:={value}') - - # Allow the config to set a launch prefix (like gdb) for nodes. We have to - # use an environment variable for this because roslaunch will error if an - # argument is unset, while an environment variable is option. - launch_prefix = config.get('launch_args', {}).get('launch_prefix') - if launch_prefix is not None: - env = dict(env) # copy first - env['LAUNCH_PREFIX'] = launch_prefix - - # Build the command to invoke as an escaped string - # For straceing signals: strace -f -o signals.log -tt -e trace=none - command = ' '.join(shlex.quote(a) for a in rl_args) - return command, env - - -# Send alerts when the program stops -def send_alerts(alert_config, deployment, launch_name): - for alert in alert_config: - assert alert['type'] == 'slack' and alert['url'] - urllib.request.urlopen( - alert['url'], - json.dumps({ - 'text': f'*PhytO-ARM process stopped*\n - Deployment: _{deployment}_ \n - Process: _{launch_name}_' - }).encode() - ) - - -def attach(args): - os.execl('/bin/sh', '/bin/sh', '-c', 'screen -r phyto-arm') + if name == "main": + # Main container doesn't need ROS_MASTER_URI + container_config["environment"] = {} + # Start the container + try: + container = client.containers.run(**container_config) + print(f"Started container {container_name}") + return container + except docker.errors.APIError as e: + print(f"Failed to start container {container_name}: {e}") + raise + +def stop_container(container) -> None: + try: + container.stop() + print(f"Stopped container {container.name}") + except docker.errors.APIError as e: + print(f"Failed to stop container {container.name}: {e}") + +def send_alerts(config) -> None: + """Send alerts when stopping the system""" + for alert in config.get('alerts', []): + if alert['type'] == 'slack' and alert.get('url'): + try: + urllib.request.urlopen( + alert['url'], + json.dumps({ + 'text': f'*PhytO-ARM process stopped*\n - Deployment: _{config.get("name", "unknown")}_ \n' + }).encode() + ) + except Exception as e: + print(f"Failed to send alert: {e}") -def _start(args): - # Validate config file +def start(args): + client = docker.from_env() + network = create_network(client) + + with open(args.config, 'r') as f: + config = yaml.safe_load(f) + + # Validate config if not args.skip_validation: print(f'Validating config file {args.config} against {args.config_schema}...') - if validate_config(args.config, args.config_schema): - print('Config file is valid') - else: + if not validate_config(args.config, args.config_schema): sys.exit(1) - - # Load the config file - with open(args.config, 'rb') as f: - config = yaml.safe_load(f) - - # Prepare the environment - env = prep_environment() - - # Define an atexit handler that can be used to terminate subprocesses. - # Recall that these handlers are invoked in reverse order, so since we start - # roscore first, it is last to be terminated. - def cleanup(name, proc, dont_kill=False): - if proc.returncode is not None: # already dead - return - + + image_name = config['docker']['image'] + containers = {} + + def cleanup(): + for container in containers.values(): + stop_container(container) try: - print(f'terminating {name} ({proc.pid})') - proc.terminate() - proc.wait(5.0) - except subprocess.TimeoutExpired: - if dont_kill: - print(f'failed to terminate {name} ({proc.pid}), ' - 'but refusing to kill') - else: - print(f'failed to terminate {name} ({proc.pid}), killing') - proc.kill() - - proc.wait() - print(f'{name} ({proc.pid}) eventually exited') - - # Set up alerts for when we terminate - atexit.register(send_alerts, config.get('alerts', []), config.get('name'), args.launch_name) - - # Allow the config to override where logs are stored - log_dir = config.get('launch_args', {}).get('log_dir') - if log_dir is not None: - env['ROS_LOG_DIR'] = log_dir - - # The following should only launch once, with the main PhytO-ARM processes - roscore = None - if args.launch_name == "main": - # Before we continue compress any older log files - #print('Compressing older logs') - #os.system(f'find {shlex.quote(log_dir)} -iname \*.log -exec gzip {{}} \;') - - # Start the ROS core processes (ROS master, etc.) - roscore = subprocess.Popen('roscore', env=env) - atexit.register(lambda: cleanup('roscore', roscore)) - - # Start the rosbag logging process. We run this separately from the main - # launch file so that if the main nodes crash, we don't terminate the rosbag - # prematurely. - command, env = prep_roslaunch(config, env, 'phyto_arm', 'rosbag.launch') - rosbag = subprocess.Popen(command, shell=True, env=env) - atexit.register(lambda: cleanup('rosbag record', rosbag, dont_kill=True)) - - # Prepare our roslaunch command - command, env = prep_roslaunch(config, env, 'phyto_arm', f'{args.launch_name}.launch') - nodes = subprocess.Popen(command, shell=True, env=env) - atexit.register(lambda: cleanup('nodes', nodes)) - - # Wait for any child to terminate - pid, status = os.wait() - - # Since we called wait() behind subprocess's back, we need to inform it that - # the process terminated. A little hacky. - for proc in [roscore, rosbag, nodes]: - if proc is not None and proc.pid == pid: - proc.returncode = os.WEXITSTATUS(status) \ - if os.WIFEXITED(status) else -1 - - # The atexit handlers will terminate the other subprocesses - pass + network.remove() + except docker.errors.APIError: + pass + send_alerts(config) + + # Register cleanup on exit + atexit.register(cleanup) + + # Start main container first + main_process = config['processes'].get('main', None) + if main_process and main_process.get('enabled', True): + containers['main'] = start_container(client, image_name, network.name, args.config, 'main', main_process) + else: + print("Main process not found or not enabled in config") + sys.exit(1) + + # Give main container time to start roscore + print("Waiting for main container to start...") + import time + time.sleep(10) + + # Start other enabled processes + for process_name, process_args in config['processes'].items(): + if process_name != 'main' and process_args.get('enabled', True): + containers[process_name] = start_container(client, image_name, network.name, args.config, process_name, process_args) + +def stop(): + client = docker.from_env() + containers = client.containers.list(filters={"name": "phyto-arm-"}) + + if not containers: + print("No running PhytO-ARM containers found") + return + # If no specific container specified, stop all + for container in containers: + stop_container(container) -def start(args): - # Use an environment variable to bypass wrapping the process in screen - if os.getenv('DONT_SCREEN'): - return _start(args) - os.environ['DONT_SCREEN'] = '1' +def attach(args): + client = docker.from_env() + containers = client.containers.list(filters={"name": "phyto-arm-"}) - # Check if there is a screen session running already - try: - subprocess.check_call( - ['screen', '-list', 'phyto-arm'], - stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL - ) - print('PhytO-ARM appears to be running already, aborting') + if not containers: + print("No running PhytO-ARM containers found") return - except subprocess.CalledProcessError: - pass - - # Re-run this command in a screen session - command = 'screen -L -dmS phyto-arm ' + \ - ' '.join(shlex.quote(a) for a in sys.argv) - os.execl('/bin/sh', '/bin/sh', '-c', command) + # If no specific container specified, attach to main + container_name = f"phyto-arm-{args.name}" if args.name else "phyto-arm-main" -def stop(args): - os.execl('/bin/sh', '/bin/sh', '-c', 'screen -S phyto-arm -X quit') + # Find the requested container + container = next((c for c in containers if c.name == container_name), None) + if not container: + print(f"Container {container_name} not found") + return + # Attach to the container + os.system(f"docker attach {container.id}") if __name__ == '__main__': - # Parse command-line arguments parser = argparse.ArgumentParser(description=__doc__) subparsers = parser.add_subparsers(dest='command', required=True) - attach_parser = subparsers.add_parser('attach') - + # Start command start_parser = subparsers.add_parser('start') - start_parser.add_argument('launch_name') start_parser.add_argument('config') start_parser.add_argument('--config_schema', default='./configs/example.yaml') start_parser.add_argument('--skip_validation', action='store_true') + # Stop command stop_parser = subparsers.add_parser('stop') + stop_parser.add_argument('config') - args = parser.parse_args() + # Attach command + attach_parser = subparsers.add_parser('attach') + attach_parser.add_argument('name', nargs='?', help='Container name to attach to (defaults to main)') + + parsed_args = parser.parse_args() # Invoke the handler for this subcommand { - 'attach': attach, 'start': start, - 'stop': stop, - }[args.command](args) + 'stop': stop, + 'attach': attach, + }[parsed_args.command](parsed_args) diff --git a/src/phyto_arm/launch/arm_chanos.launch b/src/phyto_arm/launch/arm_chanos.launch index ec8ea9d..9cb7a74 100644 --- a/src/phyto_arm/launch/arm_chanos.launch +++ b/src/phyto_arm/launch/arm_chanos.launch @@ -6,7 +6,7 @@ - + diff --git a/src/phyto_arm/launch/arm_ifcb.launch b/src/phyto_arm/launch/arm_ifcb.launch index 1b43a01..a050ccc 100644 --- a/src/phyto_arm/launch/arm_ifcb.launch +++ b/src/phyto_arm/launch/arm_ifcb.launch @@ -2,7 +2,10 @@ - + + + + diff --git a/src/phyto_arm/launch/main.launch b/src/phyto_arm/launch/main.launch index 33e4f7e..ec1b090 100644 --- a/src/phyto_arm/launch/main.launch +++ b/src/phyto_arm/launch/main.launch @@ -7,6 +7,7 @@ + @@ -22,7 +23,9 @@ - + + + diff --git a/src/phyto_arm/launch/rosbag.launch b/src/phyto_arm/launch/rosbag.launch index 40c7666..3888aba 100644 --- a/src/phyto_arm/launch/rosbag.launch +++ b/src/phyto_arm/launch/rosbag.launch @@ -8,10 +8,12 @@ In the future, we might process inactive rosbags to, for example, correct timestamps, or generate reports. See http://wiki.ros.org/rosbag/Cookbook --> - + + + args="$(eval 'record --all --exclude /camera/.*|/ifcb/image|/ifcb/in|/ifcb/roi/.*|/rosout_agg --output-prefix ' + arg('rosbag_dir') + arg('rosbag_prefix') + (' --split --size ' + str(arg('rosbag_size')) if arg('rosbag_size') else '') + (' --split --duration ' + str(arg('rosbag_duration')) if arg('rosbag_duration') else '') + ' --publish --lz4')" /> + diff --git a/src/phyto_arm/src/web_node.py b/src/phyto_arm/src/web_node.py index 18095cc..8e906f4 100755 --- a/src/phyto_arm/src/web_node.py +++ b/src/phyto_arm/src/web_node.py @@ -66,7 +66,7 @@ def main(): functools.partial(capture_field_value, name, config['topic_field'])) else: rospy.logerr(f'Config {name} invalid, must have "default" or "environment" set') - web.run_app(app, port=8098) + web.run_app(app, port=rospy.get_param('~port')) if __name__ == '__main__':