Skip to content

Commit

Permalink
Renamed services to task
Browse files Browse the repository at this point in the history
  • Loading branch information
saifulislampi committed Oct 14, 2024
1 parent 88999ca commit 3f76457
Show file tree
Hide file tree
Showing 33 changed files with 136 additions and 221 deletions.
21 changes: 11 additions & 10 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -107,17 +107,18 @@ With this simple configuration, Shepherd will:

```json
{
"program1": {
"initialized": 0.246384859085083,
"started": 0.24660515785217285,
"action_success": 5.349443197250366,
"final": 5.350545883178711
"my_service": {
"initialized": 0.19824790954589844,
"started": 0.19916582107543945,
"ready": 5.461252927780151,
"stopped": 60.36078095436096,
"final": 60.36192083358765
},
"program2": {
"initialized": 0.2456510066986084,
"started": 5.351618051528931,
"action_success": 10.464960098266602,
"final": 10.465446949005127
"my_action": {
"initialized": 0.20003700256347656,
"started": 5.466093063354492,
"action_success": 10.657495021820068,
"final": 10.658098936080933
}
}
```
Expand Down
File renamed without changes.
File renamed without changes.
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
services:
tasks:
program1:
command: "./program1.sh"
program2:
Expand Down
5 changes: 5 additions & 0 deletions examples/example2/action.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
#!/bin/bash

echo "Action is running..."
sleep 5
echo "Action completed"
6 changes: 6 additions & 0 deletions examples/example2/service.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
#!/bin/bash

echo "Service is starting..."
sleep 5
echo "Service is ready"
tail -f /dev/null # Keep the service running
16 changes: 16 additions & 0 deletions examples/example2/shepherd-config.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
tasks:
my_service:
type: "service"
command: "./service.sh"
state:
log:
ready: "Service is ready"
my_action:
type: "action"
command: "./action.sh"
dependency:
items:
my_service: "ready"
output:
state_times: "state_transition_times.json"
max_run_time: 60
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
services:
tasks:
program1:
command: "./program1.sh"
state:
Expand Down
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
24 changes: 12 additions & 12 deletions shepherd/config_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,16 @@ def load_and_preprocess_config(filepath):

def preprocess_config(config, config_path):
"""Automatically fills in missing stdout_path and stderr_path paths."""
services = config.get('services', {})
tasks = config.get('tasks', {})
stdout_dir = config.get('output', {}).get('stdout_dir', '')
working_dir = os.path.dirname(os.path.abspath(config_path))

for service_name, details in services.items():
for task_name, details in tasks.items():
# Auto-fill log and error files if not specified
if 'stdout_path' not in details:
details['stdout_path'] = f"{service_name}_stdout.log"
details['stdout_path'] = f"{task_name}_stdout.log"
if 'stderr_path' not in details:
details['stderr_path'] = f"{service_name}_stderr.log"
details['stderr_path'] = f"{task_name}_stderr.log"

if stdout_dir:
details['stdout_path'] = os.path.join(stdout_dir, details['stdout_path'])
Expand All @@ -45,23 +45,23 @@ def preprocess_config(config, config_path):

def validate_and_sort_programs(config):
logging.debug("Validating and sorting programs")
required_keys = ['services']
required_keys = ['tasks']

for key in required_keys:
if key not in config:
raise ValueError(f"Missing required key: {key}")

services = config['services']
tasks = config['tasks']

for service, details in services.items():
for task, details in tasks.items():
if 'command' not in details:
raise ValueError(f"Program {service} is missing the 'command' key")
raise ValueError(f"Program {task} is missing the 'command' key")
if 'stdout_path' not in details:
raise ValueError(f"Program {service} is missing the 'stdout_path' key")
raise ValueError(f"Program {task} is missing the 'stdout_path' key")

sorted_services = topological_sort(services)
logging.debug(f"Sorted services: {sorted_services}")
return sorted_services
sorted_tasks = topological_sort(tasks)
logging.debug(f"Sorted tasks: {sorted_tasks}")
return sorted_tasks


def topological_sort(programs):
Expand Down
18 changes: 9 additions & 9 deletions shepherd/log_monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,16 @@
import logging


def monitor_log_file(log_path, state_dict, service_name, state_keywords, cond, state_times, start_time, stop_event):
logging.debug(f"Starting to monitor file '{log_path}' for {service_name}")
def monitor_log_file(log_path, state_dict, task_name, state_keywords, cond, state_times, start_time, stop_event):
logging.debug(f"Starting to monitor file '{log_path}' for {task_name}")

if not state_keywords:
logging.debug(f"No state keywords for {service_name}, exiting monitor")
logging.debug(f"No state keywords for {task_name}, exiting monitor")
return

while not os.path.exists(log_path):
if stop_event.is_set():
logging.debug(f"Stop event set, exiting monitor for {service_name}")
logging.debug(f"Stop event set, exiting monitor for {task_name}")
return
time.sleep(0.1)

Expand All @@ -32,13 +32,13 @@ def monitor_log_file(log_path, state_dict, service_name, state_keywords, cond, s
for state in state_keywords:
if state_keywords[state] in line:
with cond:
state_dict[service_name] = state
local_state_times = state_times[service_name]
state_dict[task_name] = state
local_state_times = state_times[task_name]
local_state_times[state] = current_time
state_times[service_name] = local_state_times
state_times[task_name] = local_state_times
cond.notify_all()

logging.debug(f"{service_name} reached state '{state}' at {current_time}")
logging.debug(f"{task_name} reached state '{state}' at {current_time}")

if state == last_state:
reached_last_state = True
Expand All @@ -47,4 +47,4 @@ def monitor_log_file(log_path, state_dict, service_name, state_keywords, cond, s
if reached_last_state:
break

logging.debug(f"Finished monitoring file '{log_path}' for {service_name}")
logging.debug(f"Finished monitoring file '{log_path}' for {task_name}")
76 changes: 38 additions & 38 deletions shepherd/program_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,12 @@
from shepherd.logging_setup import setup_logging


def execute_program(config, working_dir, state_dict, service_name, cond, state_times, start_time, pgid_dict,
def execute_program(config, working_dir, state_dict, task_name, cond, state_times, start_time, pgid_dict,
stop_event, logging_queue):
setup_logging(logging_queue)

def signal_handler(signum, frame):
logging.debug(f"Received signal {signum} in {service_name}")
logging.debug(f"Received signal {signum} in {task_name}")
stop_event.set()

signal.signal(signal.SIGINT, signal_handler)
Expand All @@ -34,11 +34,11 @@ def signal_handler(signum, frame):
file_dependency_mode = file_dependencies.get('mode', 'all')
file_dependency_items = file_dependencies.get('items', [])

service_type = config.get('type', 'action')
task_type = config.get('type', 'action')

with cond:
state_dict[service_name] = "initialized"
update_state_time(service_name, "initialized", start_time, state_times)
state_dict[task_name] = "initialized"
update_state_time(task_name, "initialized", start_time, state_times)
cond.notify_all()

if file_dependencies:
Expand All @@ -53,47 +53,47 @@ def signal_handler(signum, frame):

if stop_event.is_set():
with cond:
state_dict[service_name] = "stopped_before_execution"
update_state_time(service_name, "stopped_before_execution", start_time, state_times)
state_dict[task_name] = "stopped_before_execution"
update_state_time(task_name, "stopped_before_execution", start_time, state_times)
cond.notify_all()
return

logging.debug(f"Dependant file {file_path} found for service {service_name}")
logging.debug(f"Dependant file {file_path} found for task {task_name}")

try:
with cond:
if dependency_mode == 'all':
for dep_service, required_state in dependencies.items():
while required_state not in state_times.get(dep_service, {}) and not stop_event.is_set():
for dep_task, required_state in dependencies.items():
while required_state not in state_times.get(dep_task, {}) and not stop_event.is_set():
cond.wait()

elif dependency_mode == 'any':
satisfied = False
while not satisfied and not stop_event.is_set():
for dep_service, required_state in dependencies.items():
if required_state in state_times.get(dep_service, {}):
for dep_task, required_state in dependencies.items():
if required_state in state_times.get(dep_task, {}):
satisfied = True
break
if not satisfied:
cond.wait()

if stop_event.is_set():
with cond:
state_dict[service_name] = "stopped_before_execution"
update_state_time(service_name, "stopped_before_execution", start_time, state_times)
state_dict[task_name] = "stopped_before_execution"
update_state_time(task_name, "stopped_before_execution", start_time, state_times)
cond.notify_all()
return

logging.debug(f"Starting execution of '{service_type}' {service_name}")
logging.debug(f"Starting execution of '{task_type}' {task_name}")

with cond:
state_dict[service_name] = "started"
update_state_time(service_name, "started", start_time, state_times)
state_dict[task_name] = "started"
update_state_time(task_name, "started", start_time, state_times)
cond.notify_all()

# Start the main log monitoring thread
log_thread = threading.Thread(target=monitor_log_file,
args=(stdout_path, state_dict, service_name, stdout_states, cond, state_times,
args=(stdout_path, state_dict, task_name, stdout_states, cond, state_times,
start_time, stop_event))
log_thread.start()

Expand All @@ -102,7 +102,7 @@ def signal_handler(signum, frame):
if file_path_to_monitor:
file_monitor_thread = threading.Thread(target=monitor_log_file,
args=(
file_path_to_monitor, state_dict, service_name, file_states,
file_path_to_monitor, state_dict, task_name, file_states,
cond,
state_times, start_time, stop_event))
file_monitor_thread.start()
Expand All @@ -111,7 +111,7 @@ def signal_handler(signum, frame):
with open(stdout_path, 'w') as out, open(stderr_path, 'w') as err:
process = subprocess.Popen(command, shell=True, cwd=working_dir, stdout=out, stderr=err,
preexec_fn=os.setsid)
pgid_dict[service_name] = os.getpgid(process.pid)
pgid_dict[task_name] = os.getpgid(process.pid)

while process.poll() is None:
time.sleep(0.1)
Expand All @@ -122,31 +122,31 @@ def signal_handler(signum, frame):

with cond:
if stop_event.is_set() and return_code == -signal.SIGTERM:
state_dict[service_name] = "stopped"
update_state_time(service_name, "stopped", start_time, state_times)
state_dict[task_name] = "stopped"
update_state_time(task_name, "stopped", start_time, state_times)
cond.notify_all()

if service_type == 'service' and not stop_event.is_set():
logging.debug(f"Stopping execution of '{service_type}' {service_name}")
if task_type == 'service' and not stop_event.is_set():
logging.debug(f"Stopping execution of '{task_type}' {task_name}")

# If a service stops before receiving a stop event, mark it as failed
with cond:
state_dict[service_name] = "failure"
update_state_time(service_name, "failure", start_time, state_times)
state_dict[task_name] = "failure"
update_state_time(task_name, "failure", start_time, state_times)
cond.notify_all()
logging.debug(f"ERROR: Service {service_name} stopped unexpectedly, marked as failure.")
logging.debug(f"ERROR: Task {task_name} stopped unexpectedly, marked as failure.")

elif service_type == 'action':
elif task_type == 'action':
action_state = "action_success" if return_code == 0 else "action_failure"

with cond:
state_dict[service_name] = action_state
update_state_time(service_name, action_state, start_time, state_times)
state_dict[task_name] = action_state
update_state_time(task_name, action_state, start_time, state_times)
cond.notify_all()

with cond:
state_dict[service_name] = "final"
update_state_time(service_name, "final", start_time, state_times)
state_dict[task_name] = "final"
update_state_time(task_name, "final", start_time, state_times)
cond.notify_all()

if log_thread.is_alive():
Expand All @@ -156,16 +156,16 @@ def signal_handler(signum, frame):
file_monitor_thread.join()

except Exception as e:
logging.debug(f"Exception in executing {service_name}: {e}")
logging.debug(f"Exception in executing {task_name}: {e}")

logging.debug(f"Finished execution of {service_name}")
logging.debug(f"Finished execution of {task_name}")


def update_state_time(service_name, state, start_time, state_times):
def update_state_time(task_name, state, start_time, state_times):
current_time = time.time() - start_time

local_state_times = state_times[service_name]
local_state_times = state_times[task_name]
local_state_times[state] = current_time
state_times[service_name] = local_state_times
state_times[task_name] = local_state_times

logging.debug(f"Service '{service_name}' reached the state '{state}' at time {current_time:.3f}")
logging.debug(f"Task '{task_name}' reached the state '{state}' at time {current_time:.3f}")
Loading

0 comments on commit 3f76457

Please sign in to comment.