-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathscheduler.py
212 lines (181 loc) · 6.28 KB
/
scheduler.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
import os
import toml
import logging
import sqlite3
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.cron import CronTrigger
from apscheduler.triggers.interval import IntervalTrigger
from datetime import datetime
from subprocess import Popen, PIPE
from models import init_db
from condition_parser import evaluate_condition
import web_ui
from daemon import DaemonContext
from utils import get_valid_directory
# Initialize logging
logs_path = get_valid_directory()
LOG_FILE = os.path.join(str(logs_path), "logs", "scheduler.log")
os.makedirs(os.path.dirname(LOG_FILE), exist_ok=True)
logging.basicConfig(
level=logging.INFO,
filename=LOG_FILE,
filemode="a",
format="%(asctime)s [%(levelname)s] %(message)s",
)
# Global variables
scheduler = BackgroundScheduler()
CONFIG = {}
# Load configuration
def load_config(config_file="config.toml"):
"""
Load the configuration file. Return a valid configuration or raise an error if not found.
"""
global CONFIG
if os.path.exists(config_file):
CONFIG = toml.load(config_file)
# Validate required keys in the configuration
if "settings" not in CONFIG:
CONFIG["settings"] = {}
# Set a default database path if not specified
if "db_path" not in CONFIG["settings"]:
CONFIG["settings"]["db_path"] = "jobs.db"
return CONFIG
else:
raise FileNotFoundError(f"Configuration file '{config_file}' not found.")
def write_pid(pid_file):
"""
Write the current process PID to a PID file.
"""
with open(pid_file, "w") as f:
f.write(str(os.getpid()))
logging.info(f"Daemon PID {os.getpid()} written to {pid_file}.")
def remove_pid(pid_file):
"""
Remove the PID file.
"""
if os.path.exists(pid_file):
os.remove(pid_file)
logging.info(f"PID file {pid_file} removed.")
# Job Execution
def run_job(job_id, interpreter, command, env_file=None):
"""
Execute the job's command and log its output, exit code, and execution time.
"""
start_time = datetime.now()
# Load environment variables from env_file
env = os.environ.copy()
if env_file and os.path.exists(env_file):
with open(env_file) as f:
env.update(
dict(line.strip().split("=", 1) for line in f if line.strip() and not line.startswith("#"))
)
# Execute the command
process = Popen([interpreter, "-c", command], stdout=PIPE, stderr=PIPE, env=env)
stdout, stderr = process.communicate()
exit_code = process.returncode
# Log execution details
end_time = datetime.now()
execution_time = (end_time - start_time).total_seconds()
log_to_db(job_id, exit_code, execution_time)
log_to_file(job_id, exit_code, execution_time, stdout, stderr)
def log_to_db(job_id, exit_code, execution_time):
"""
Log job execution details to SQLite database.
"""
conn = sqlite3.connect(CONFIG["settings"]["db_path"])
cursor = conn.cursor()
cursor.execute(
"""
INSERT INTO job_execution_logs (job_id, exit_code, execution_time, timestamp)
VALUES (?, ?, ?, ?)
""",
(job_id, exit_code, execution_time, datetime.now()),
)
conn.commit()
conn.close()
def log_to_file(job_id, exit_code, execution_time, stdout, stderr):
"""
Log job execution details to a log file.
"""
with open(LOG_FILE, "a") as log:
log.write(f"[{datetime.now()}] Job {job_id}: Exit Code={exit_code}, Execution Time={execution_time}s\n")
if stdout:
log.write(f"STDOUT:\n{stdout.decode()}\n")
if stderr:
log.write(f"STDERR:\n{stderr.decode()}\n")
# Schedule Jobs
def schedule_jobs(jobs):
"""
Add jobs to the APScheduler based on their configuration.
"""
for job_id, job in jobs.items():
interpreter = CONFIG["interpreters"].get(job["type"], "")
if not interpreter:
logging.warning(f"Interpreter for job {job_id} not found.")
continue
# Check for conditions
condition = job.get("condition")
if condition:
condition_met = evaluate_condition(
condition, CONFIG["settings"]["db_path"], job_id
)
if not condition_met:
logging.info(f"Skipping job {job_id} because its condition is not met.")
continue
# Determine schedule
schedule_type = job.get("schedule_type", "cron")
if schedule_type == "cron":
trigger = CronTrigger.from_crontab(job["schedule"])
else:
trigger = IntervalTrigger(seconds=job["interval_seconds"])
scheduler.add_job(
func=run_job,
args=[job_id, interpreter, job["command"], job.get("env_file")],
trigger=trigger,
id=job_id,
name=job.get("name", f"Job {job_id}"),
replace_existing=True,
)
def start_daemon(daemonize=False):
"""
Start the job scheduler daemon.
"""
global CONFIG
init_db(CONFIG["settings"]["db_path"])
schedule_jobs(CONFIG["jobs"])
# Get the PID file path from config
pid_file = CONFIG["settings"].get("pid_file", "/tmp/avscheduler.pid")
# Start the Flask web interface
web_host = CONFIG["web_server"].get("host", "127.0.0.1")
web_port = CONFIG["web_server"].get("port", 5000)
def start_flask():
web_ui.app.run(host=web_host, port=web_port, debug=False)
if daemonize:
# Daemonize using python-daemon or custom method
with DaemonContext():
write_pid(pid_file)
flask_thread = start_flask_in_thread()
try:
scheduler.start()
flask_thread.join()
finally:
remove_pid(pid_file)
else:
write_pid(pid_file)
flask_thread = start_flask_in_thread()
try:
scheduler.start()
flask_thread.join()
finally:
remove_pid(pid_file)
def start_flask_in_thread():
"""
Start the Flask app in a separate thread.
"""
from threading import Thread
flask_thread = Thread(target=lambda: web_ui.app.run(debug=False))
flask_thread.daemon = True
flask_thread.start()
return flask_thread
if __name__ == "__main__":
start_daemon(daemonize=True)