Skip to content

Commit

Permalink
Merge pull request #67 from getyourguide/feat/add-option-to-watch-use…
Browse files Browse the repository at this point in the history
…r-defined-files

Feat/add option to watch user defined files
  • Loading branch information
sgerloff authored Jan 29, 2024
2 parents e1840e4 + a32ed23 commit 0f988ac
Show file tree
Hide file tree
Showing 6 changed files with 82 additions and 25 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
# Changelog db-rocket

## Version 2.1.0
- New paramter for ``rocket launch --glob_path=<...>``, which allows to specify a list of globs for files to deploy during launch.

## Version 2.0.4
- Update version number.

Expand Down
11 changes: 11 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,17 @@ and following in a new Python cell:
Finally, add the content in you databricks notebook:
![imgs/img_2.png](imgs/img_2.png)

#### Include non-python files
Upload all root level json files:
```shell
rocket launch --glob_path="*,json"
```
On top also upload all env files:
```shell
rocket launch --glob_path="[\"*.json\", \".env*\"]"
```
When specifying lists, be mindful about the formatting of the parameter string.

### To Upload Your Python Package

If you've disabled the watch feature, `databricks-rocket` will only upload your project as a wheel to DBFS:
Expand Down
22 changes: 16 additions & 6 deletions rocket/file_watcher.py
Original file line number Diff line number Diff line change
@@ -1,27 +1,37 @@
import glob
import os
import time

from typing import List
from watchdog.events import FileSystemEventHandler
from watchdog.observers import Observer

from rocket.utils import gather_glob_paths


class FileWatcher:
class _Handler(FileSystemEventHandler):
def __init__(self, watcher_instance):
self.watcher_instance = watcher_instance

def on_modified(self, event):
if event.is_directory:
_current_glob_files = gather_glob_paths(self.watcher_instance.glob_paths)
if event.src_path in _current_glob_files:
self.watcher_instance.modified_files.add(event.src_path)
elif event.is_directory:
return
if os.path.splitext(event.src_path)[1] == ".py":
self.watcher_instance.modified_files.append(event.src_path)
elif os.path.splitext(event.src_path)[1] == ".py":
self.watcher_instance.modified_files.add(event.src_path)

def __init__(self, path_to_watch, callback, recursive=True):
def __init__(self, path_to_watch, callback, recursive=True, glob_paths: List[str] = None):
self.path_to_watch = path_to_watch
self.callback = callback
self.recursive = recursive
self.observer = Observer()
self.modified_files = []
self.modified_files = set()
self.glob_paths = glob_paths
if self.glob_paths is None:
self.glob_paths = []
self.handler = self._Handler(self)

def start(self):
Expand All @@ -33,7 +43,7 @@ def start(self):
while True:
time.sleep(1)
if self.modified_files:
self.callback(self.modified_files)
self.callback(list(self.modified_files))
self.modified_files.clear()
except KeyboardInterrupt:
self.observer.stop()
Expand Down
60 changes: 42 additions & 18 deletions rocket/rocket.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import os
from typing import Optional
import glob
from typing import Optional, List, Union

import fire

Expand All @@ -10,6 +11,7 @@
extract_python_package_dirs,
extract_python_files_from_folder,
execute_for_each_multithreaded,
gather_glob_paths,
)


Expand All @@ -26,7 +28,7 @@ def setup(self):
Initialize the application.
"""
if os.path.exists("setup.py") or os.path.exists(f"pyproject.toml"):
logger.info("Packaing file already exists so no need to create a new one")
logger.info("Packaging file already exists so no need to create a new one")
return

content = """
Expand All @@ -51,13 +53,15 @@ def launch(
self,
project_location: str = ".",
dbfs_path: Optional[str] = None,
watch=True,
):
watch: bool = True,
glob_path: Optional[Union[str, List[str]]] = None
) -> None:
"""
Entrypoint of the application, triggers a build and deploy
:param project_location:
:param project_location: path to project code, default: `"."`
:param dbfs_path: path where the wheel will be stored, ex: dbfs:/tmp/myteam/myproject
:param watch: Set to false if you don't want to automatically sync your files
:param glob_path: glob string or list of strings for additional files to deploy, e.g. "*.json"
:return:
"""
if os.getenv("DATABRICKS_TOKEN") is None:
Expand All @@ -79,7 +83,13 @@ def launch(
project_name = os.path.abspath(project_location).split("/")[-1]
dbfs_path = f"{dbfs_path}/{project_name}"

self._build_and_deploy(watch, project_location, dbfs_path)
glob_paths = []
if isinstance(glob_path, str):
glob_paths = [os.path.join(project_location, glob_path)]
elif isinstance(glob_path, list):
glob_paths = [os.path.join(project_location, path) for path in glob_path]

self._build_and_deploy(watch=watch, project_location=project_location, dbfs_path=dbfs_path, glob_paths=glob_paths)
if watch:
watcher = FileWatcher(
project_location,
Expand All @@ -88,13 +98,20 @@ def launch(
modified_files=watcher.modified_files,
dbfs_path=dbfs_path,
project_location=project_location,
glob_paths=glob_path
),
glob_paths=glob_paths,
)
watcher.start()

def _build_and_deploy(
self, watch, project_location, dbfs_path, modified_files=None
):
self,
watch: bool,
project_location: str,
dbfs_path: str,
modified_files: Optional[List[str]] = None,
glob_paths: Optional[List[str]] = None
) -> None:
if modified_files:
logger.info(f"Found changes in {modified_files}. Overwriting them.")
self._deploy(
Expand Down Expand Up @@ -134,15 +151,17 @@ def _build_and_deploy(
return

package_dirs = extract_python_package_dirs(project_location)
files = []
files = set()
for package_dir in package_dirs:
for file in extract_python_files_from_folder(package_dir):
files.append(file)
files.update(extract_python_files_from_folder(package_dir))

if glob_paths is not None:
files.update(gather_glob_paths(glob_paths))

project_files = ["setup.py", "pyproject.toml"]
for project_file in project_files:
if os.path.exists(f"{project_location}/{project_file}"):
files.append(f"{project_location}/{project_file}")
files.add(f"{project_location}/{project_file}")

if os.path.exists(f"{project_location}/pyproject.toml"):
execute_shell_command(
Expand All @@ -156,15 +175,15 @@ def _build_and_deploy(
for dependency_file in dependency_files:
dependency_file_path = f"{project_location}/{dependency_file}"
if os.path.exists(dependency_file_path):
files.append(dependency_file_path)
files.add(dependency_file_path)
uploaded_dependency_file = dependency_file
dependency_file_exist = True
with open(dependency_file_path) as f:
index_urls = [
line.strip() for line in f.readlines() if "index-url" in line
]
self._deploy(
file_paths=files, dbfs_path=dbfs_path, project_location=project_location
file_paths=list(files), dbfs_path=dbfs_path, project_location=project_location
)

install_path = f'{dbfs_path.replace("dbfs:/", "/dbfs/")}'
Expand Down Expand Up @@ -193,15 +212,20 @@ def _build_and_deploy(
%autoreload 2"""
)

def _deploy(self, file_paths, dbfs_path, project_location):
def helper(file):
def _deploy(
self,
file_paths: List[str],
dbfs_path: str,
project_location: str
) -> None:
def helper(file: str) -> None:
target_path = f"{dbfs_path}/{os.path.relpath(file, project_location)}"
execute_shell_command(f"databricks fs cp --overwrite {file} {target_path}")
execute_shell_command(f"databricks fs cp --recursive --overwrite {file} {target_path}")
logger.info(f"Uploaded {file} to {target_path}")

execute_for_each_multithreaded(file_paths, lambda x: helper(x))

def _create_python_project_wheel(self, project_location):
def _create_python_project_wheel(self, project_location: str) -> (str, str):
dist_location = f"{project_location}/dist"
execute_shell_command(f"rm {dist_location}/* 2>/dev/null || true")

Expand Down
9 changes: 9 additions & 0 deletions rocket/utils.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import concurrent.futures
import glob
import os
import subprocess

from typing import List, Set
from rocket.logger import logger


Expand Down Expand Up @@ -53,3 +55,10 @@ def extract_python_files_from_folder(path):
py_files.append(os.path.join(root, file))

return py_files


def gather_glob_paths(glob_paths: List[str]) -> Set[str]:
_unique_paths = set()
for glob_path in glob_paths:
_unique_paths.update(glob.glob(glob_path))
return _unique_paths
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

setuptools.setup(
name="databricks-rocket",
version="2.0.4",
version="2.1.0",
author="GetYourGuide",
author_email="[email protected]",
description="Keep your local python scripts installed and in sync with a databricks notebook. Shortens the feedback loop to develop projects using a hybrid enviroment",
Expand Down

0 comments on commit 0f988ac

Please sign in to comment.