Skip to content

Commit

Permalink
Add stuff to sftp_interface class: compare directories and directory …
Browse files Browse the repository at this point in the history
…walking
  • Loading branch information
RichieHakim committed Aug 23, 2024
1 parent b6cb5a7 commit d943a33
Showing 1 changed file with 145 additions and 2 deletions.
147 changes: 145 additions & 2 deletions bnpm/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -216,8 +216,6 @@ def __init__(

self.client = paramiko.SSHClient()
self.client.set_missing_host_key_policy(paramiko.AutoAddPolicy())

self.ssh = None

def connect(
self,
Expand Down Expand Up @@ -628,6 +626,7 @@ def __init__(
elif isinstance(ssh_client, paramiko.Channel):
self.transport = ssh_client.get_transport()
self.sftp = paramiko.SFTPClient.from_transport(self.transport)
self.client = None

def connect(
self,
Expand All @@ -650,6 +649,8 @@ def connect(
self.sftp = self.client.open_sftp()
else:
raise ValueError('No valid connection method found')

self.ssh = ssh_interface(ssh_client=self.client)

def put_dir(self, source, target, mkdir=True, verbose=True):
'''
Expand Down Expand Up @@ -1008,6 +1009,148 @@ def conj_func(a,b):
callback=conj_func if prog_bar else callback,
).download()

def compare_directories(
self,
dir_remote,
dir_local,
depth=None,
verbose=True,
):
"""
Compares a local directory with a remote directory using file metadata.
Args:
dir_remote (str):
Path to the remote directory.
dir_local (str):
Path to the local directory.
depth (int):
Maximum depth to search. If None, will search recursively
through all subdirectories and files. If 0, will only search the
current directory.
verbose (bool):
Whether or not to print progress: \n
* 0/False: no printing \n
* 1/True: will print found files \n
* 2: will print the current directory being searched.
"""
print(f"Walking remote directory: {dir_remote}") if verbose else None
paths_rel_remote = [str(Path(p).relative_to(dir_remote)) for p in self.walk_remote(dir_remote, depth=depth, verbose=verbose > 1)]
print(f"Walking local directory: {dir_local}") if verbose else None
paths_rel_local = [str(Path(p).relative_to(dir_local)) for p in self.walk_local(dir_local, depth=depth, verbose=verbose > 1)]

paths_common = set(paths_rel_remote) & set(paths_rel_local)
paths_unique_remote = set(paths_rel_remote) - set(paths_rel_local)
paths_unique_local = set(paths_rel_local) - set(paths_rel_remote)

def _metadata_remote(path):
stat = self.sftp.stat(str(Path(dir_remote) / path))
return {
'size': stat.st_size,
'last_modified': datetime.datetime.fromtimestamp(stat.st_mtime).strftime('%Y-%m-%d %H:%M:%S'),
'permissions': stat.st_mode,
}
def _metadata_local(path):
stat = (Path(dir_local) / path).stat()
return {
'size': stat.st_size,
'last_modified': datetime.datetime.fromtimestamp(stat.st_mtime).strftime('%Y-%m-%d %H:%M:%S'),
'permissions': stat.st_mode,
}

metadata_remote = {path: _metadata_remote(path) for path in paths_common | paths_unique_remote}
metadata_local = {path: _metadata_local(path) for path in paths_common | paths_unique_local}


metadata_matched = {path: {meta: metadata_remote[path][meta] == metadata_local[path][meta] for meta in metadata_remote[path]} for path in paths_common}

return {
'paths_common': paths_common,
'paths_unique_remote': paths_unique_remote,
'paths_unique_local': paths_unique_local,
'metadata_remote': metadata_remote,
'metadata_local': metadata_local,
'metadata_matched': metadata_matched,
}

def walk_remote(
self,
dir_remote,
depth=None,
verbose=True,
):
"""
Walks through a remote directory. Yielding the path to each file. Does
not return empty directories.
Args:
dir_remote (str):
Path to the remote directory.
depth (int):
Maximum depth to search. If None, will search recursively
through all subdirectories and files. If 0, will only search the
current directory.
verbose (bool):
Whether or not to print progress: \n
* 0/False: no printing \n
* 1/True: will print found files \n
* 2: will print the current directory being searched.
"""
if depth is None:
depth = float('inf')

def _recursive_walk(sftp, cwd='.', depth_current=0, verbose=True):
if depth_current > depth:
return
contents = {name: stat.S_ISDIR(attr.st_mode) for name, attr in zip(sftp.listdir(cwd), sftp.listdir_attr(cwd))}
for name, isdir in contents.items():
if isdir:
print(f'cwd: {cwd}, name: {name}, isdir: {isdir}, depth: {depth_current}') if verbose > 1 else None
yield from _recursive_walk(sftp, cwd=str(Path(cwd) / name), depth_current=depth_current+1, verbose=verbose)
else:
print(f"found: {str(Path(cwd) / name)}") if verbose else None
yield str(Path(cwd) / name)

return _recursive_walk(self.sftp, cwd=dir_remote, depth_current=0, verbose=verbose)

def walk_local(
self,
dir_local,
depth=None,
verbose=True,
):
"""
Walks through a local directory. Yielding the path to each file. Does
not return empty directories.
Args:
dir_local (str):
Path to the local directory.
depth (int):
Maximum depth to search. If None, will search recursively
through all subdirectories and files. If 0, will only search the
current directory.
verbose (bool):
Whether or not to print progress: \n
* 0/False: no printing \n
* 1/True: will print found files \n
* 2: will print the current directory being searched.
"""
if depth is None:
depth = float('inf')

def _recursive_walk(cwd, depth_current=0, verbose=True):
if depth_current > depth:
return
contents = {name: Path(cwd / name).is_dir() for name in os.listdir(cwd)}
for name, isdir in contents.items():
if isdir:
print(f'cwd: {cwd}, name: {name}, isdir: {isdir}, depth: {depth_current}') if verbose > 1 else None
yield from _recursive_walk(cwd / name, depth_current=depth_current+1, verbose=verbose)
else:
print(f"found: {cwd / name}") if verbose else None
yield str(cwd / name)

return _recursive_walk(Path(dir_local), depth_current=0, verbose=verbose)

def close(self):
if hasattr(self, 'transport'):
self.transport.close()
Expand Down

0 comments on commit d943a33

Please sign in to comment.