forked from globus/globus-flows-trigger-examples
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathwatch.py
87 lines (69 loc) · 2.83 KB
/
watch.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
import os
import sys
import time
from watchdog.events import FileSystemEventHandler
from watchdog.observers import Observer
class FileTrigger:
def __init__(self, watch_dir, patterns, FlowRunner=None):
self.observer = Observer()
self.watch_dir = watch_dir
self.patterns = patterns
self.FlowRunner = FlowRunner
def run(self):
print("Watcher Started\n")
if not self.FlowRunner:
print("No callback function defined for events.")
print("Using system print()")
self.FlowRunner = print
if not os.path.isdir(self.watch_dir):
print("Watch directory does not exist.")
os.mkdir(self.watch_dir)
print("Directory " + self.watch_dir + " was created")
os.chdir(self.watch_dir)
print(f"Monitoring: {self.watch_dir}\n")
event_handler = Handler(self.FlowRunner, self.patterns)
self.observer.schedule(event_handler, self.watch_dir, recursive=True)
self.observer.start()
try:
while True:
time.sleep(1)
except:
self.observer.stop()
print("Watcher stopped.")
self.observer.join()
class Handler(FileSystemEventHandler):
def __init__(self, FlowRunner, patterns):
super(FileSystemEventHandler).__init__()
self.logic_function = FlowRunner
self.patterns = patterns
# This is the callback function for file events.
# You can edit it to trigger at file creation, modification or deletion,
# and have different behaviors for each.
def on_any_event(self, event):
if event.event_type == "created":
if event.is_directory:
# print("directory created")
# self.logic_function(event.src_path)
return None
else:
print(f"File created: {os.path.basename(event.src_path)}")
for pattern in self.patterns:
if event.src_path.endswith(pattern):
print(f"File ends with {pattern}")
print("Starting flow...")
self.logic_function(event.src_path)
return None
# elif (event.event_type == 'modified'):
# self.logic_function(event.src_path)
# return None
def translate_local_path_to_globus_path(local_path: str) -> str:
r"""Translate a local path to a Globus-compatible path.
On Windows, the local directory will be something like `C:\path\to\watch`.
This must be converted to /c/path/to/watch/ when referenced as a Globus path.
"""
if not sys.platform.lower().startswith("win"):
return local_path
drive, _, path = local_path.partition(":")
drive = drive.lower()
path = path.replace("\\", "/").strip("/")
return f"/{drive}/{path}/"