Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added a node to keep track of files #647

Merged
merged 2 commits into from
Nov 8, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/sisl/nodes/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from .context import SISL_NODES_CONTEXT, NodeContext, temporal_context
from .file_nodes import FileNode
from .node import Node
from .utils import nodify_module
from .workflow import Workflow
176 changes: 176 additions & 0 deletions src/sisl/nodes/file_nodes.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
from pathlib import Path

try:
import watchdog.events
import watchdog.observers

WATCHDOG_IMPORTED = True
except ImportError:
WATCHDOG_IMPORTED = False

from sisl.messages import warn

from .node import Node

if WATCHDOG_IMPORTED:

class CallbackHandler(watchdog.events.PatternMatchingEventHandler):
def __init__(self, callback_obj, *args, **kwargs):
super().__init__(*args, **kwargs)
self.callback_obj = callback_obj

def _run_method(self, method_name, event):

Check notice

Code scanning / CodeQL

Explicit returns mixed with implicit (fall through) returns Note

Mixing implicit and explicit returns may indicate an error as implicit returns always return None.
if not hasattr(self.callback_obj, "matches_path"):
return
if not self.callback_obj.matches_path(event.src_path):
return

func = getattr(self.callback_obj, method_name, None)
if callable(func):
return func(event)

def on_modified(self, event):
self._run_method("on_file_modified", event)

def on_created(self, event):
self._run_method("on_file_created", event)

def on_moved(self, event):
self._run_method("on_file_moved", event)

def on_deleted(self, event):
self._run_method("on_file_deleted", event)


class FileNode(Node):
"""Node that listens to changes to a given file.

If the file changes, the node will be marked as outdated and the signal
will be propagated to all downstream nodes.

Therefore, if autoupdate is enabled at some point down the tree, an
update will be triggered.

This node requires `watchdog` to be installed in order to be fully
functional. Otherwise, it will not fail but it won't do anything.

Parameters
----------
path :
Path to the file to watch.

Examples
--------

.. code-block:: python
from sisl.nodes import Node, FileNode
import time

# Write something to file
with open("testfile", "w") as f:
f.write("HELLO")

# Create a FileNode
n = FileNode("testfile")

# Define a file reader node that prints the contents of the file
@Node.from_func
def print_contents(path):
print("printing contents...")
with open(path, "r") as f:
print(f.read())

# And initialize it by passing the FileNode as an input
printer = print_contents(path=n)

print("---RUNNING NODE")

# Run the printer node
printer.get()

print("--- SET AUTOMATIC UPDATING")

# Now set it to automatically update on changes to upstream inputs
printer.context.update(lazy=False)

print("--- APPENDING TO FILE")

# Append to the file which will trigger the update.
with open("testfile", "a") as f:
f.write("\nBYE")

# Give some time for the printer to react before exiting
time.sleep(1)

This should give the following output:

.. code-block::

---RUNNING NODE
printing contents...
HELLO
--- SET AUTOMATIC UPDATING
--- APPENDING TO FILE
printing contents...
HELLO
BYE

"""

def setup(self, *args, **kwargs):
super().setup(*args, **kwargs)

self._setup_observer()

def _setup_observer(self):
"""Sets up the watchdog observer."""
if not WATCHDOG_IMPORTED:
warn(
"Watchdog is not installed. {self.__class__.__name__} will not be able to detect changes in files."
)
else:
# Watchdog watches directories so we should watch the parent
parent_path = Path(self.inputs["path"]).parent

self.observer = watchdog.observers.Observer()
handler = CallbackHandler(self)
self.observer.schedule(handler, parent_path)
self.observer.start()

def _update_observer(self):
"""Updates the observer to watch the (possibly) new path."""
if not WATCHDOG_IMPORTED:
return

self.observer.unschedule_all()
parent_path = Path(self.inputs["path"]).parent
self.observer.schedule(CallbackHandler(self), parent_path)

# Methods to interact with the observer
def matches_path(self, path: str):
return Path(path) == Path(self.inputs["path"])

def on_file_modified(self, event):
self.on_file_change(event)

def on_file_created(self, event):
self.on_file_change(event)

def on_file_moved(self, event):
self.on_file_change(event)

def on_file_deleted(self, event):
self.on_file_change(event)

def on_file_change(self, event):
self._receive_outdated()

def update_inputs(self, **inputs):
# We wrap the update_inputs method to update the observer if the path
# changes.
super().update_inputs(**inputs)
if "path" in inputs:
self._update_observer()

def function(self, path: str) -> Path:
return Path(path)
30 changes: 30 additions & 0 deletions src/sisl/nodes/tests/test_file_nodes.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
import tempfile

Check failure on line 1 in src/sisl/nodes/tests/test_file_nodes.py

View workflow job for this annotation

GitHub Actions / lint / lint

Imports are incorrectly sorted and/or formatted.
from pathlib import Path
import time

import pytest

from sisl.nodes import FileNode


def test_file_node_return():
n = FileNode("test.txt")

assert n.get() == Path("test.txt")


def test_file_node_update():
pytest.importorskip("watchdog")

with tempfile.NamedTemporaryFile("w", delete=False) as f:
n = FileNode(f.name)

assert n.get() == Path(f.name)

assert not n._outdated

f.write("test")

time.sleep(0.2)

assert n._outdated
Loading