From 3f76457528643a32438e6e9a8383489b9239cb8d Mon Sep 17 00:00:00 2001 From: Saiful Islam Date: Mon, 14 Oct 2024 01:16:48 -0400 Subject: [PATCH] Renamed services to task --- README.md | 21 ++--- {test => examples}/example1/program1.sh | 0 {test => examples}/example1/program2.sh | 0 .../example1/shepherd-config.yml | 2 +- examples/example2/action.sh | 5 ++ examples/example2/service.sh | 6 ++ examples/example2/shepherd-config.yml | 16 ++++ {test => examples}/example3/cleanup.sh | 0 .../example3}/program1.sh | 0 .../example3}/program2.sh | 0 {test => examples}/example3/program3.sh | 0 .../example3}/run_test.sh | 0 .../example3/shepherd-config.yml | 2 +- .../sade/gen_shepherd_config_for_suas.py | 0 {test => examples}/sade/sade-config.yml | 0 {test => examples}/sade/sade-wf.yml | 0 {test => examples}/sade/simple.sh | 0 {test => examples}/sade/spawn_model.sh | 0 .../sade/start_gazebo_server.sh | 0 {test => examples}/sade/start_pose_sender.sh | 0 {test => examples}/sade/start_px4_instance.sh | 0 {test => examples}/sade/suas_template.yml.j2 | 0 shepherd/config_loader.py | 24 +++--- shepherd/log_monitor.py | 18 ++--- shepherd/program_executor.py | 76 +++++++++---------- shepherd/service_manager.py | 62 +++++++-------- shepherd/shepherd.py | 6 +- shepherd/shepherd_viz.py | 7 +- test/example2/program3.sh | 26 ------- test/example2/shepherd-config.yml | 27 ------- test/example3/program1.sh | 26 ------- test/example3/program2.sh | 26 ------- test/example3/run_test.sh | 7 -- 33 files changed, 136 insertions(+), 221 deletions(-) rename {test => examples}/example1/program1.sh (100%) rename {test => examples}/example1/program2.sh (100%) rename {test => examples}/example1/shepherd-config.yml (96%) create mode 100755 examples/example2/action.sh create mode 100755 examples/example2/service.sh create mode 100644 examples/example2/shepherd-config.yml rename {test => examples}/example3/cleanup.sh (100%) rename {test/example2 => examples/example3}/program1.sh (100%) rename {test/example2 => examples/example3}/program2.sh (100%) rename {test => examples}/example3/program3.sh (100%) rename {test/example2 => examples/example3}/run_test.sh (100%) rename {test => examples}/example3/shepherd-config.yml (98%) rename {test => examples}/sade/gen_shepherd_config_for_suas.py (100%) rename {test => examples}/sade/sade-config.yml (100%) rename {test => examples}/sade/sade-wf.yml (100%) rename {test => examples}/sade/simple.sh (100%) rename {test => examples}/sade/spawn_model.sh (100%) rename {test => examples}/sade/start_gazebo_server.sh (100%) rename {test => examples}/sade/start_pose_sender.sh (100%) rename {test => examples}/sade/start_px4_instance.sh (100%) rename {test => examples}/sade/suas_template.yml.j2 (100%) delete mode 100755 test/example2/program3.sh delete mode 100644 test/example2/shepherd-config.yml delete mode 100755 test/example3/program1.sh delete mode 100755 test/example3/program2.sh delete mode 100644 test/example3/run_test.sh diff --git a/README.md b/README.md index c8ad045..dcfb807 100644 --- a/README.md +++ b/README.md @@ -107,17 +107,18 @@ With this simple configuration, Shepherd will: ```json { - "program1": { - "initialized": 0.246384859085083, - "started": 0.24660515785217285, - "action_success": 5.349443197250366, - "final": 5.350545883178711 + "my_service": { + "initialized": 0.19824790954589844, + "started": 0.19916582107543945, + "ready": 5.461252927780151, + "stopped": 60.36078095436096, + "final": 60.36192083358765 }, - "program2": { - "initialized": 0.2456510066986084, - "started": 5.351618051528931, - "action_success": 10.464960098266602, - "final": 10.465446949005127 + "my_action": { + "initialized": 0.20003700256347656, + "started": 5.466093063354492, + "action_success": 10.657495021820068, + "final": 10.658098936080933 } } ``` diff --git a/test/example1/program1.sh b/examples/example1/program1.sh similarity index 100% rename from test/example1/program1.sh rename to examples/example1/program1.sh diff --git a/test/example1/program2.sh b/examples/example1/program2.sh similarity index 100% rename from test/example1/program2.sh rename to examples/example1/program2.sh diff --git a/test/example1/shepherd-config.yml b/examples/example1/shepherd-config.yml similarity index 96% rename from test/example1/shepherd-config.yml rename to examples/example1/shepherd-config.yml index 1d0e87e..0ab2219 100644 --- a/test/example1/shepherd-config.yml +++ b/examples/example1/shepherd-config.yml @@ -1,4 +1,4 @@ -services: +tasks: program1: command: "./program1.sh" program2: diff --git a/examples/example2/action.sh b/examples/example2/action.sh new file mode 100755 index 0000000..e69d1cb --- /dev/null +++ b/examples/example2/action.sh @@ -0,0 +1,5 @@ +#!/bin/bash + +echo "Action is running..." +sleep 5 +echo "Action completed" \ No newline at end of file diff --git a/examples/example2/service.sh b/examples/example2/service.sh new file mode 100755 index 0000000..1692ccb --- /dev/null +++ b/examples/example2/service.sh @@ -0,0 +1,6 @@ +#!/bin/bash + +echo "Service is starting..." +sleep 5 +echo "Service is ready" +tail -f /dev/null # Keep the service running \ No newline at end of file diff --git a/examples/example2/shepherd-config.yml b/examples/example2/shepherd-config.yml new file mode 100644 index 0000000..b6e5fae --- /dev/null +++ b/examples/example2/shepherd-config.yml @@ -0,0 +1,16 @@ +tasks: + my_service: + type: "service" + command: "./service.sh" + state: + log: + ready: "Service is ready" + my_action: + type: "action" + command: "./action.sh" + dependency: + items: + my_service: "ready" +output: + state_times: "state_transition_times.json" +max_run_time: 60 \ No newline at end of file diff --git a/test/example3/cleanup.sh b/examples/example3/cleanup.sh similarity index 100% rename from test/example3/cleanup.sh rename to examples/example3/cleanup.sh diff --git a/test/example2/program1.sh b/examples/example3/program1.sh similarity index 100% rename from test/example2/program1.sh rename to examples/example3/program1.sh diff --git a/test/example2/program2.sh b/examples/example3/program2.sh similarity index 100% rename from test/example2/program2.sh rename to examples/example3/program2.sh diff --git a/test/example3/program3.sh b/examples/example3/program3.sh similarity index 100% rename from test/example3/program3.sh rename to examples/example3/program3.sh diff --git a/test/example2/run_test.sh b/examples/example3/run_test.sh similarity index 100% rename from test/example2/run_test.sh rename to examples/example3/run_test.sh diff --git a/test/example3/shepherd-config.yml b/examples/example3/shepherd-config.yml similarity index 98% rename from test/example3/shepherd-config.yml rename to examples/example3/shepherd-config.yml index 0456a5b..c3c2b0c 100644 --- a/test/example3/shepherd-config.yml +++ b/examples/example3/shepherd-config.yml @@ -1,4 +1,4 @@ -services: +tasks: program1: command: "./program1.sh" state: diff --git a/test/sade/gen_shepherd_config_for_suas.py b/examples/sade/gen_shepherd_config_for_suas.py similarity index 100% rename from test/sade/gen_shepherd_config_for_suas.py rename to examples/sade/gen_shepherd_config_for_suas.py diff --git a/test/sade/sade-config.yml b/examples/sade/sade-config.yml similarity index 100% rename from test/sade/sade-config.yml rename to examples/sade/sade-config.yml diff --git a/test/sade/sade-wf.yml b/examples/sade/sade-wf.yml similarity index 100% rename from test/sade/sade-wf.yml rename to examples/sade/sade-wf.yml diff --git a/test/sade/simple.sh b/examples/sade/simple.sh similarity index 100% rename from test/sade/simple.sh rename to examples/sade/simple.sh diff --git a/test/sade/spawn_model.sh b/examples/sade/spawn_model.sh similarity index 100% rename from test/sade/spawn_model.sh rename to examples/sade/spawn_model.sh diff --git a/test/sade/start_gazebo_server.sh b/examples/sade/start_gazebo_server.sh similarity index 100% rename from test/sade/start_gazebo_server.sh rename to examples/sade/start_gazebo_server.sh diff --git a/test/sade/start_pose_sender.sh b/examples/sade/start_pose_sender.sh similarity index 100% rename from test/sade/start_pose_sender.sh rename to examples/sade/start_pose_sender.sh diff --git a/test/sade/start_px4_instance.sh b/examples/sade/start_px4_instance.sh similarity index 100% rename from test/sade/start_px4_instance.sh rename to examples/sade/start_px4_instance.sh diff --git a/test/sade/suas_template.yml.j2 b/examples/sade/suas_template.yml.j2 similarity index 100% rename from test/sade/suas_template.yml.j2 rename to examples/sade/suas_template.yml.j2 diff --git a/shepherd/config_loader.py b/shepherd/config_loader.py index 5123f3d..28b8869 100644 --- a/shepherd/config_loader.py +++ b/shepherd/config_loader.py @@ -19,16 +19,16 @@ def load_and_preprocess_config(filepath): def preprocess_config(config, config_path): """Automatically fills in missing stdout_path and stderr_path paths.""" - services = config.get('services', {}) + tasks = config.get('tasks', {}) stdout_dir = config.get('output', {}).get('stdout_dir', '') working_dir = os.path.dirname(os.path.abspath(config_path)) - for service_name, details in services.items(): + for task_name, details in tasks.items(): # Auto-fill log and error files if not specified if 'stdout_path' not in details: - details['stdout_path'] = f"{service_name}_stdout.log" + details['stdout_path'] = f"{task_name}_stdout.log" if 'stderr_path' not in details: - details['stderr_path'] = f"{service_name}_stderr.log" + details['stderr_path'] = f"{task_name}_stderr.log" if stdout_dir: details['stdout_path'] = os.path.join(stdout_dir, details['stdout_path']) @@ -45,23 +45,23 @@ def preprocess_config(config, config_path): def validate_and_sort_programs(config): logging.debug("Validating and sorting programs") - required_keys = ['services'] + required_keys = ['tasks'] for key in required_keys: if key not in config: raise ValueError(f"Missing required key: {key}") - services = config['services'] + tasks = config['tasks'] - for service, details in services.items(): + for task, details in tasks.items(): if 'command' not in details: - raise ValueError(f"Program {service} is missing the 'command' key") + raise ValueError(f"Program {task} is missing the 'command' key") if 'stdout_path' not in details: - raise ValueError(f"Program {service} is missing the 'stdout_path' key") + raise ValueError(f"Program {task} is missing the 'stdout_path' key") - sorted_services = topological_sort(services) - logging.debug(f"Sorted services: {sorted_services}") - return sorted_services + sorted_tasks = topological_sort(tasks) + logging.debug(f"Sorted tasks: {sorted_tasks}") + return sorted_tasks def topological_sort(programs): diff --git a/shepherd/log_monitor.py b/shepherd/log_monitor.py index edf12c2..cda18a0 100644 --- a/shepherd/log_monitor.py +++ b/shepherd/log_monitor.py @@ -3,16 +3,16 @@ import logging -def monitor_log_file(log_path, state_dict, service_name, state_keywords, cond, state_times, start_time, stop_event): - logging.debug(f"Starting to monitor file '{log_path}' for {service_name}") +def monitor_log_file(log_path, state_dict, task_name, state_keywords, cond, state_times, start_time, stop_event): + logging.debug(f"Starting to monitor file '{log_path}' for {task_name}") if not state_keywords: - logging.debug(f"No state keywords for {service_name}, exiting monitor") + logging.debug(f"No state keywords for {task_name}, exiting monitor") return while not os.path.exists(log_path): if stop_event.is_set(): - logging.debug(f"Stop event set, exiting monitor for {service_name}") + logging.debug(f"Stop event set, exiting monitor for {task_name}") return time.sleep(0.1) @@ -32,13 +32,13 @@ def monitor_log_file(log_path, state_dict, service_name, state_keywords, cond, s for state in state_keywords: if state_keywords[state] in line: with cond: - state_dict[service_name] = state - local_state_times = state_times[service_name] + state_dict[task_name] = state + local_state_times = state_times[task_name] local_state_times[state] = current_time - state_times[service_name] = local_state_times + state_times[task_name] = local_state_times cond.notify_all() - logging.debug(f"{service_name} reached state '{state}' at {current_time}") + logging.debug(f"{task_name} reached state '{state}' at {current_time}") if state == last_state: reached_last_state = True @@ -47,4 +47,4 @@ def monitor_log_file(log_path, state_dict, service_name, state_keywords, cond, s if reached_last_state: break - logging.debug(f"Finished monitoring file '{log_path}' for {service_name}") + logging.debug(f"Finished monitoring file '{log_path}' for {task_name}") diff --git a/shepherd/program_executor.py b/shepherd/program_executor.py index 6717f97..3825072 100644 --- a/shepherd/program_executor.py +++ b/shepherd/program_executor.py @@ -9,12 +9,12 @@ from shepherd.logging_setup import setup_logging -def execute_program(config, working_dir, state_dict, service_name, cond, state_times, start_time, pgid_dict, +def execute_program(config, working_dir, state_dict, task_name, cond, state_times, start_time, pgid_dict, stop_event, logging_queue): setup_logging(logging_queue) def signal_handler(signum, frame): - logging.debug(f"Received signal {signum} in {service_name}") + logging.debug(f"Received signal {signum} in {task_name}") stop_event.set() signal.signal(signal.SIGINT, signal_handler) @@ -34,11 +34,11 @@ def signal_handler(signum, frame): file_dependency_mode = file_dependencies.get('mode', 'all') file_dependency_items = file_dependencies.get('items', []) - service_type = config.get('type', 'action') + task_type = config.get('type', 'action') with cond: - state_dict[service_name] = "initialized" - update_state_time(service_name, "initialized", start_time, state_times) + state_dict[task_name] = "initialized" + update_state_time(task_name, "initialized", start_time, state_times) cond.notify_all() if file_dependencies: @@ -53,25 +53,25 @@ def signal_handler(signum, frame): if stop_event.is_set(): with cond: - state_dict[service_name] = "stopped_before_execution" - update_state_time(service_name, "stopped_before_execution", start_time, state_times) + state_dict[task_name] = "stopped_before_execution" + update_state_time(task_name, "stopped_before_execution", start_time, state_times) cond.notify_all() return - logging.debug(f"Dependant file {file_path} found for service {service_name}") + logging.debug(f"Dependant file {file_path} found for task {task_name}") try: with cond: if dependency_mode == 'all': - for dep_service, required_state in dependencies.items(): - while required_state not in state_times.get(dep_service, {}) and not stop_event.is_set(): + for dep_task, required_state in dependencies.items(): + while required_state not in state_times.get(dep_task, {}) and not stop_event.is_set(): cond.wait() elif dependency_mode == 'any': satisfied = False while not satisfied and not stop_event.is_set(): - for dep_service, required_state in dependencies.items(): - if required_state in state_times.get(dep_service, {}): + for dep_task, required_state in dependencies.items(): + if required_state in state_times.get(dep_task, {}): satisfied = True break if not satisfied: @@ -79,21 +79,21 @@ def signal_handler(signum, frame): if stop_event.is_set(): with cond: - state_dict[service_name] = "stopped_before_execution" - update_state_time(service_name, "stopped_before_execution", start_time, state_times) + state_dict[task_name] = "stopped_before_execution" + update_state_time(task_name, "stopped_before_execution", start_time, state_times) cond.notify_all() return - logging.debug(f"Starting execution of '{service_type}' {service_name}") + logging.debug(f"Starting execution of '{task_type}' {task_name}") with cond: - state_dict[service_name] = "started" - update_state_time(service_name, "started", start_time, state_times) + state_dict[task_name] = "started" + update_state_time(task_name, "started", start_time, state_times) cond.notify_all() # Start the main log monitoring thread log_thread = threading.Thread(target=monitor_log_file, - args=(stdout_path, state_dict, service_name, stdout_states, cond, state_times, + args=(stdout_path, state_dict, task_name, stdout_states, cond, state_times, start_time, stop_event)) log_thread.start() @@ -102,7 +102,7 @@ def signal_handler(signum, frame): if file_path_to_monitor: file_monitor_thread = threading.Thread(target=monitor_log_file, args=( - file_path_to_monitor, state_dict, service_name, file_states, + file_path_to_monitor, state_dict, task_name, file_states, cond, state_times, start_time, stop_event)) file_monitor_thread.start() @@ -111,7 +111,7 @@ def signal_handler(signum, frame): with open(stdout_path, 'w') as out, open(stderr_path, 'w') as err: process = subprocess.Popen(command, shell=True, cwd=working_dir, stdout=out, stderr=err, preexec_fn=os.setsid) - pgid_dict[service_name] = os.getpgid(process.pid) + pgid_dict[task_name] = os.getpgid(process.pid) while process.poll() is None: time.sleep(0.1) @@ -122,31 +122,31 @@ def signal_handler(signum, frame): with cond: if stop_event.is_set() and return_code == -signal.SIGTERM: - state_dict[service_name] = "stopped" - update_state_time(service_name, "stopped", start_time, state_times) + state_dict[task_name] = "stopped" + update_state_time(task_name, "stopped", start_time, state_times) cond.notify_all() - if service_type == 'service' and not stop_event.is_set(): - logging.debug(f"Stopping execution of '{service_type}' {service_name}") + if task_type == 'service' and not stop_event.is_set(): + logging.debug(f"Stopping execution of '{task_type}' {task_name}") # If a service stops before receiving a stop event, mark it as failed with cond: - state_dict[service_name] = "failure" - update_state_time(service_name, "failure", start_time, state_times) + state_dict[task_name] = "failure" + update_state_time(task_name, "failure", start_time, state_times) cond.notify_all() - logging.debug(f"ERROR: Service {service_name} stopped unexpectedly, marked as failure.") + logging.debug(f"ERROR: Task {task_name} stopped unexpectedly, marked as failure.") - elif service_type == 'action': + elif task_type == 'action': action_state = "action_success" if return_code == 0 else "action_failure" with cond: - state_dict[service_name] = action_state - update_state_time(service_name, action_state, start_time, state_times) + state_dict[task_name] = action_state + update_state_time(task_name, action_state, start_time, state_times) cond.notify_all() with cond: - state_dict[service_name] = "final" - update_state_time(service_name, "final", start_time, state_times) + state_dict[task_name] = "final" + update_state_time(task_name, "final", start_time, state_times) cond.notify_all() if log_thread.is_alive(): @@ -156,16 +156,16 @@ def signal_handler(signum, frame): file_monitor_thread.join() except Exception as e: - logging.debug(f"Exception in executing {service_name}: {e}") + logging.debug(f"Exception in executing {task_name}: {e}") - logging.debug(f"Finished execution of {service_name}") + logging.debug(f"Finished execution of {task_name}") -def update_state_time(service_name, state, start_time, state_times): +def update_state_time(task_name, state, start_time, state_times): current_time = time.time() - start_time - local_state_times = state_times[service_name] + local_state_times = state_times[task_name] local_state_times[state] = current_time - state_times[service_name] = local_state_times + state_times[task_name] = local_state_times - logging.debug(f"Service '{service_name}' reached the state '{state}' at time {current_time:.3f}") + logging.debug(f"Task '{task_name}' reached the state '{state}' at time {current_time:.3f}") diff --git a/shepherd/service_manager.py b/shepherd/service_manager.py index 9eba48f..dd7222f 100644 --- a/shepherd/service_manager.py +++ b/shepherd/service_manager.py @@ -24,12 +24,12 @@ def save_state_times(state_times, output_file): json.dump(state_times_dict, f, indent=2) -class ServiceManager: +class TaskManager: def __init__(self, config_path, logging_queue): - logging.debug("Initializing ServiceManager") + logging.debug("Initializing TaskManager") self.config = load_and_preprocess_config(config_path) - self.services = self.config['services'] - self.sorted_services = validate_and_sort_programs(self.config) + self.tasks = self.config['tasks'] + self.sorted_tasks = validate_and_sort_programs(self.config) self.working_dir = os.path.dirname(os.path.abspath(config_path)) self.output = self.config['output'] self.stop_signal_path = os.path.join(self.working_dir, self.config.get('stop_signal', '')) @@ -42,7 +42,7 @@ def __init__(self, config_path, logging_queue): self.processes = {} self.logging_queue = logging_queue self.cleanup_command = self.config.get('cleanup_command', None) - logging.debug("ServiceManager initialized") + logging.debug("TaskManager initialized") def setup_signal_handlers(self): logging.debug("Setting up signal handlers") @@ -50,27 +50,27 @@ def setup_signal_handlers(self): signal.signal(signal.SIGINT, self.signal_handler) def signal_handler(self, signum, frame): - logging.debug(f"Received signal {signum} in pid {os.getpid()}, stopping all services...") + logging.debug(f"Received signal {signum} in pid {os.getpid()}, stopping all tasks...") self.stop_event.set() - def start_services(self, start_time): - logging.debug("Starting services") + def start_tasks(self, start_time): + logging.debug("Starting tasks") self.setup_signal_handlers() - for service in self.sorted_services: - service_config = self.services[service] + for task in self.sorted_tasks: + task_config = self.tasks[task] - self.state_dict[service] = "" - self.state_times[service] = {} + self.state_dict[task] = "" + self.state_times[task] = {} p_exec = Process(target=execute_program, args=( - service_config, self.working_dir, self.state_dict, service, self.cond, self.state_times, start_time, + task_config, self.working_dir, self.state_dict, task, self.cond, self.state_times, start_time, self.pgid_dict, self.stop_event, self.logging_queue)) p_exec.start() - self.processes[service] = p_exec + self.processes[task] = p_exec - logging.debug("All services initialized") + logging.debug("All tasks initialized") stop_thread = threading.Thread(target=self.check_stop_conditions, args=(start_time,)) stop_thread.start() @@ -89,17 +89,17 @@ def check_stop_conditions(self, start_time): logging.debug("Checking stop conditions") while not self.stop_event.is_set(): if (self.check_stop_signal_file() - or self.check_max_run_time(start_time) or self.check_all_services_final()): + or self.check_max_run_time(start_time) or self.check_all_tasks_final()): self.stop_event.set() else: self.stop_event.wait(timeout=1) - self.stop_all_services() + self.stop_all_tasks() logging.debug("Finished checking stop conditions") - def stop_all_services(self): - logging.debug("Stopping all services") + def stop_all_tasks(self): + logging.debug("Stopping all tasks") if self.cleanup_command: try: @@ -110,33 +110,33 @@ def stop_all_services(self): except subprocess.CalledProcessError as e: logging.error(f"System cleanup command failed: {e}") - for service_name, process in self.processes.items(): - pgid = self.pgid_dict.get(service_name) + for task_name, process in self.processes.items(): + pgid = self.pgid_dict.get(task_name) if pgid: try: os.killpg(pgid, signal.SIGTERM) except ProcessLookupError: - logging.debug(f"Process group {pgid} for service {service_name} not found.") + logging.debug(f"Process group {pgid} for task {task_name} not found.") # process.terminate() for process in self.processes.values(): process.join() - logging.debug("All services have been stopped") + logging.debug("All tasks have been stopped") - def stop_service(self, service_name): - if service_name in self.processes: - process = self.processes[service_name] + def stop_task(self, task_name): + if task_name in self.processes: + process = self.processes[task_name] if process.is_alive(): - pgid = self.pgid_dict.get(service_name) + pgid = self.pgid_dict.get(task_name) if pgid: os.killpg(pgid, signal.SIGTERM) # Terminate the process group process.terminate() process.join() - logging.debug(f"Service {service_name} has been stopped.") + logging.debug(f"Task {task_name} has been stopped.") else: - logging.debug(f"Service {service_name} not found.") + logging.debug(f"Task {task_name} not found.") def check_stop_signal_file(self): if os.path.exists(self.stop_signal_path) and os.path.isfile(self.stop_signal_path): @@ -147,11 +147,11 @@ def check_max_run_time(self, start_time): if self.max_run_time: current_time = time.time() if (current_time - start_time) > self.max_run_time: - logging.debug("Maximum runtime exceeded. Stopping all services.") + logging.debug("Maximum runtime exceeded. Stopping all tasks.") return True return False - def check_all_services_final(self): + def check_all_tasks_final(self): for state in self.state_dict.values(): if state != 'final': return False diff --git a/shepherd/shepherd.py b/shepherd/shepherd.py index f3454b9..4511f55 100644 --- a/shepherd/shepherd.py +++ b/shepherd/shepherd.py @@ -6,7 +6,7 @@ import logging.handlers import argparse -from shepherd.service_manager import ServiceManager +from shepherd.service_manager import TaskManager from shepherd.logging_setup import setup_logging, listener_process from shepherd._version import __version__ # Import the version @@ -32,8 +32,8 @@ def main(): setup_logging(logging_queue) logging.debug("Starting main") - service_manager = ServiceManager(config_path, logging_queue) - service_manager.start_services(start_time) + service_manager = TaskManager(config_path, logging_queue) + service_manager.start_tasks(start_time) logging.debug("Exiting main") logging_queue.put(None) # Send None to the listener to stop it diff --git a/shepherd/shepherd_viz.py b/shepherd/shepherd_viz.py index fe14790..fc6497b 100755 --- a/shepherd/shepherd_viz.py +++ b/shepherd/shepherd_viz.py @@ -75,7 +75,7 @@ def generate_state_transition_graph(config, state_transition, output_prefix, out } # Add subgraphs for each service with their state transitions - for service, details in config['services'].items(): + for service, details in config['tasks'].items(): service_type = details.get('type', 'action') # Default to 'action' if type is not specified node_color = colors.get(service_type, 'lightgrey') # Default color if no specific type is found @@ -96,7 +96,7 @@ def generate_state_transition_graph(config, state_transition, output_prefix, out sub.attr(label=service) # Add edges based on dependencies - for service, details in config['services'].items(): + for service, details in config['tasks'].items(): dependencies = details.get('dependency', {}).get('items', {}) for dep, state in dependencies.items(): dot.edge(f'{dep}_{state}', f'{service}_started', label=f'{dep} {state}') @@ -115,7 +115,7 @@ def generate_workflow_graph(config, output_prefix, output_format='png'): } # Add nodes with color based on service type - for service, details in config['services'].items(): + for service, details in config['tasks'].items(): service_type = details.get('type', 'action') # Default to 'service' if type is not specified node_color = colors.get(service_type, 'lightgrey') # Default color if no specific type is found dot.node(service, service, shape='box', style='filled', color=node_color) @@ -156,4 +156,3 @@ def main(): if __name__ == '__main__': main() -q \ No newline at end of file diff --git a/test/example2/program3.sh b/test/example2/program3.sh deleted file mode 100755 index f5e7654..0000000 --- a/test/example2/program3.sh +++ /dev/null @@ -1,26 +0,0 @@ -#!/bin/bash -START_TIME=$(date +%s) -startup_delay=5 - -while true; do - echo "$(date +%s) - program is booting up" - sleep 0.5 - - if [[ $(date +%s) -gt $((START_TIME + startup_delay)) ]]; then - echo "$(date +%s) - program is ready" - break - fi -done - -READY_TIME=$(date +%s) -run_duration=30 - -while true; do - echo "$(date +%s) - program is running" - sleep 0.5 - - if [[ $(date +%s) -gt $((READY_TIME + run_duration)) ]]; then - echo "$(date +%s) - program is completed" - break - fi -done diff --git a/test/example2/shepherd-config.yml b/test/example2/shepherd-config.yml deleted file mode 100644 index 74f91c7..0000000 --- a/test/example2/shepherd-config.yml +++ /dev/null @@ -1,27 +0,0 @@ -services: - program1: - command: "./program1.sh" - state: - log: - ready: "program is ready" - complete: "program is completed" - program2: - command: "./program2.sh" - state: - log: - ready: "program is ready" - complete: "program is completed" - dependency: - mode: "all" - items: - program1: "ready" - program3: "complete" - program3: - command: "./program3.sh" - state: - log: - ready: "program is ready" - complete: "program is completed" -output: - state_times: "state_transition_times.json" -max_run_time: 120 diff --git a/test/example3/program1.sh b/test/example3/program1.sh deleted file mode 100755 index f5e7654..0000000 --- a/test/example3/program1.sh +++ /dev/null @@ -1,26 +0,0 @@ -#!/bin/bash -START_TIME=$(date +%s) -startup_delay=5 - -while true; do - echo "$(date +%s) - program is booting up" - sleep 0.5 - - if [[ $(date +%s) -gt $((START_TIME + startup_delay)) ]]; then - echo "$(date +%s) - program is ready" - break - fi -done - -READY_TIME=$(date +%s) -run_duration=30 - -while true; do - echo "$(date +%s) - program is running" - sleep 0.5 - - if [[ $(date +%s) -gt $((READY_TIME + run_duration)) ]]; then - echo "$(date +%s) - program is completed" - break - fi -done diff --git a/test/example3/program2.sh b/test/example3/program2.sh deleted file mode 100755 index f5e7654..0000000 --- a/test/example3/program2.sh +++ /dev/null @@ -1,26 +0,0 @@ -#!/bin/bash -START_TIME=$(date +%s) -startup_delay=5 - -while true; do - echo "$(date +%s) - program is booting up" - sleep 0.5 - - if [[ $(date +%s) -gt $((START_TIME + startup_delay)) ]]; then - echo "$(date +%s) - program is ready" - break - fi -done - -READY_TIME=$(date +%s) -run_duration=30 - -while true; do - echo "$(date +%s) - program is running" - sleep 0.5 - - if [[ $(date +%s) -gt $((READY_TIME + run_duration)) ]]; then - echo "$(date +%s) - program is completed" - break - fi -done diff --git a/test/example3/run_test.sh b/test/example3/run_test.sh deleted file mode 100644 index 39aeb5d..0000000 --- a/test/example3/run_test.sh +++ /dev/null @@ -1,7 +0,0 @@ -#!/bin/bash - -echo "Starting test ..." - -shepherd -c shepherd-config.yml - -echo "Completed test"