Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support multiple mount points #34

Merged
merged 9 commits into from
Feb 20, 2024
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
211 changes: 122 additions & 89 deletions rsync_to_rdisc.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,37 +33,39 @@ def send_email(subject, template, body_params, attachments=None):
)


def send_mail_lost_mount(run_file):
def send_mail_lost_mount(mount_name, run_file):
hostname = gethostname()
send_email(
f"ERROR: mount lost to BGarray for {hostname}", "lost_mount.html",
{"hostname": hostname, "run_file": run_file},
subject=f"ERROR: mount lost to {mount_name} for {hostname}",
template="lost_mount.html",
body_params={"mount_name": mount_name, "hostname": hostname, "run_file": run_file},
ellendejong marked this conversation as resolved.
Show resolved Hide resolved
)


def send_mail_lost_hpc(hpc_host, run_file):
send_email(
f"ERROR: Connection to HPC transfernodes {hpc_host} are lost", "lost_hpc.html",
{"filename": hpc_host, "run_file": run_file},
subject=f"ERROR: Connection to HPC transfernodes {hpc_host} are lost",
template="lost_hpc.html",
body_params={"filename": hpc_host, "run_file": run_file},
)


def send_mail_transfer_state(filename, state, upload_result_gatk=None, upload_result_exomedepth=None):
body_params = {"filename": filename}
if state in ["ok", "vcf_upload_error", "vcf_upload_warning"]:
if state == "ok":
subject = f"COMPLETED: Transfer to BGarray has succesfully completed for {filename}"
subject = f"COMPLETED: Transfer has successfully completed for {filename}"
elif state == "vcf_upload_error":
subject = f"ERROR: Transfer to BGarray has completed with VCF upload error for {filename}"
subject = f"ERROR: Transfer has completed with VCF upload error for {filename}"
elif state == "vcf_upload_warning":
subject = f"COMPLETED: Transfer to BGarray has completed with VCF upload warning for {filename}"
subject = f"COMPLETED: Transfer has completed with VCF upload warning for {filename}"
template = "transfer_ok.html"
body_params.update({
"upload_result_gatk": upload_result_gatk,
"upload_result_exomedepth": upload_result_exomedepth
})
elif state == "error":
subject = f"ERROR: Transfer to BGarray has not completed for {filename}"
subject = f"ERROR: Transfer has not completed for {filename}"
template = "transfer_error.html"
send_email(subject, template, body_params)

Expand All @@ -75,36 +77,50 @@ def send_mail_incomplete(run, title_template, subject, run_file):
send_email(subject, f"{title_template}.html", body_params)


def check_rsync(run, folder, temperror, log):
if not Path(temperror).stat().st_size:
msg_bgarray_log = [[""], [">>> No errors detected <<<"]]
Path(temperror).unlink()
def check_rsync(run, transfer_settings):
if not Path(settings.temp_error_path).stat().st_size:
log_msg = [[""], [">>> No errors detected <<<"]]
Path(settings.temp_error_path).unlink() # remove tmperror file.
rsync_result = "ok"
else:
msg_bgarray_log = [
[""], [f">>>{run}_{folder} errors detected in Processed data transfer, not added to completed files <<<"]
log_msg = [
[""],
[f">>>{run}_{transfer_settings['name']} errors detected in data transfer, not added to completed files <<<"]
]
send_mail_transfer_state("{}{}".format(settings.folder_dic[folder]["input"], run), "error")
send_mail_transfer_state("{}{}".format(transfer_settings["input"], run), "error")
rernst marked this conversation as resolved.
Show resolved Hide resolved
rsync_result = "error"
with open(log, 'a', newline='\n') as log_file:

with open(settings.log_path, 'a', newline='\n') as log_file:
log_file_writer = writer(log_file, delimiter='\t')
log_file_writer.writerows(msg_bgarray_log)
log_file_writer.writerows(log_msg)

return rsync_result


def check_daemon_running(wkdir):
run_file = Path(f"{wkdir}/transfer.running")
try:
run_file = Path(f"{wkdir}/transfer.running")
run_file.touch(exist_ok=False)
return run_file
except FileExistsError:
sys.exit()
else:
return run_file
ellendejong marked this conversation as resolved.
Show resolved Hide resolved


def check_mount(bgarray, run_file):
if not Path(bgarray).exists():
send_mail_lost_mount(run_file)
sys.exit()
def is_mount_available(mount_name, mount_path, run_file):
is_available = True
try:
Path(mount_path).exists()
except (OSError, BlockingIOError):
is_available = False
else:
if not Path(mount_path).exists():
is_available = False

# Send email if mount is not available
if not is_available:
send_mail_lost_mount(mount_name, run_file)
return is_available


def get_transferred_runs(wkdir):
Expand Down Expand Up @@ -132,28 +148,28 @@ def connect_to_remote_server(host_keys, servers, user, run_file):
except OSError:
if hpc_server == servers[-1]:
send_mail_lost_hpc(" and ".join(servers), run_file)
sys.exit("Connection to HPC transfernodes are lost.")
sys.exit("Connection to HPC transfer nodes are lost.")
except (timeout, ssh_exception.SSHException, ssh_exception.AuthenticationException):
if hpc_server == servers[-1]:
Path(run_file).unlink()
sys.exit("HPC connection timeout/SSHException/AuthenticationException")
return client, hpc_server


def get_folders_remote_server(client, folder_dic, run_file, transferred_set):
def get_folders_remote_server(client, transfers, run_file, transferred_set):
to_be_transferred = {}
for folder in folder_dic:
for transfer in transfers:
try:
stdin, stdout, stderr = client.exec_command("ls {}".format(folder_dic[folder]["input"]))
stdin, stdout, stderr = client.exec_command("ls {}".format(transfer["input"]))
except (ConnectionResetError, TimeoutError):
Path(run_file).unlink()
sys.exit("HPC connection ConnectionResetError/TimeoutError")

folders = stdout.read().decode("utf8").split()
for item in folders:
combined = f"{item}_{folder}"
input_folders = stdout.read().decode("utf8").split()
for input_folder in input_folders:
combined = f"{input_folder}_{transfer['name']}"
if combined not in transferred_set:
to_be_transferred[item] = folder
to_be_transferred[input_folder] = transfer

return to_be_transferred

Expand All @@ -172,92 +188,95 @@ def check_if_file_missing(required_files, input_folder, client):
return missing


def action_if_file_missing(folder, remove_run_file, missing, run, folder_location, run_file):
def action_if_file_missing(transfer_settings, rsync_succes, missing, run, run_file):
# Send a mail and lock datatransfer
if not isinstance(folder.get('continue_without_email', None), bool):
reason = ("Unknown status {0}: {1} in settings.py for {2}").format(
'continue_without_email', folder.get('continue_without_email', None), folder)
if not isinstance(transfer_settings.get('continue_without_email', None), bool):
reason = "Unknown status {0}: {1} in settings.py for {2}".format(
'continue_without_email',
transfer_settings.get('continue_without_email', None),
transfer_settings['name']
)
send_mail_incomplete(run, "settings", reason, run_file)
return False
elif 'continue_without_email' in folder and folder["continue_without_email"]:
# Do not send a mail and do not lock datatransfer
return remove_run_file
elif 'continue_without_email' in folder and not folder["continue_without_email"]:
# Send a mail and lock datatransfer
# Do not send a mail and do not lock datatransfer
elif 'continue_without_email' in transfer_settings and transfer_settings["continue_without_email"]:
return rsync_succes
# Send a mail and lock datatransfer
elif 'continue_without_email' in transfer_settings and not transfer_settings["continue_without_email"]:
reason = (
"Analysis not complete (file(s) {0} missing). "
"Run = {1} in folder {2} ".format(" and ".join(missing), run, folder_location))
"Run = {1} in folder {2} ".format(" and ".join(missing), run, transfer_settings["input"]))
send_mail_incomplete(run, "transfer_notcomplete", reason, run_file)
return False


def rsync_server_remote(
hpc_server, client, to_be_transferred, run_file, log=settings.log, bgarray=settings.bgarray, wkdir=settings.wkdir
):
def rsync_server_remote(hpc_server, client, to_be_transferred, mount_path, run_file):
ellendejong marked this conversation as resolved.
Show resolved Hide resolved
date = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
remove_run_file = True
rsync_succes = True

temperror = settings.temperror
folder_dic = settings.folder_dic
for run in to_be_transferred:
with open(f"{bgarray}/{log}", 'a', newline='\n') as log_file:
with open(settings.log_path, 'a', newline='\n') as log_file:
log_file_writer = writer(log_file, delimiter='\t')
log_file_writer.writerows([["#########"], [f"Date: {date}", f"Run_folder: {run}"]])
folder_location_type = to_be_transferred[run]
# Settings per folder data type, such as remote input dir and local output dir, etc.
folder = folder_dic[folder_location_type]
missing = check_if_file_missing(folder["files_required"], f"{folder['input']}{run}", client)

transfer_settings = to_be_transferred[run]
# settings per folder data type, such as remote input dir and local output dir, etc.
missing = check_if_file_missing(transfer_settings["files_required"], f"{transfer_settings['input']}/{run}", client)

if missing:
remove_run_file = action_if_file_missing(folder, remove_run_file, missing, run, folder_location_type, run_file)
# Don't transfer the run if a required file is missing.
continue
os.system(
(
"rsync -rahuL --stats {user}@{server}:{path}{run} {output}/ "
" 1>> {bgarray}/{log} 2>> {bgarray}/{errorlog} 2> {temperror}"
).format(
print(transfer_settings, rsync_succes, missing, run, run_file)
rsync_succes = action_if_file_missing(transfer_settings, rsync_succes, missing, run, run_file)
continue # don't transfer the run if a required file is missing.
os.system((
"rsync -rahuL --stats {user}@{server}:{input}/{run} {mount_path}/{output}/ "
" 1>> {log_path} 2>> {errorlog_path} 2> {temp_error_path}").format(
user=settings.user,
server=hpc_server,
path=folder["input"],
input=transfer_settings["input"],
run=run,
output=folder["output"],
bgarray=bgarray,
log=log,
errorlog=settings.errorlog,
temperror=temperror
)
)
rsync_result = check_rsync(run, folder_location_type, temperror, f"{bgarray}/{log}")
mount_path=mount_path,
output=transfer_settings["output"],
log_path=settings.log_path,
errorlog_path=settings.errorlog_path,
temp_error_path=settings.temp_error_path
))
rsync_result = check_rsync(run, transfer_settings)

if rsync_result == "ok":
upload_result_gatk = None
upload_result_exomedepth = None
email_state = rsync_result

if folder['upload_gatk_vcf']:
upload_state, upload_result_gatk = upload_gatk_vcf(run, f"{folder['output']}/{run}")
if transfer_settings['upload_gatk_vcf']:
upload_state, upload_result_gatk = upload_gatk_vcf(
run=run,
run_folder="{output}/{run}".format(output=transfer_settings["output"], run=run)
)
if upload_state != "ok":
# Warning or error
email_state = f"vcf_upload_{upload_state}"

if folder['upload_exomedepth_vcf']:
upload_state, upload_result_exomedepth = upload_exomedepth_vcf(run, f"{folder['output']}/{run}")
if transfer_settings['upload_exomedepth_vcf']:
upload_state, upload_result_exomedepth = upload_exomedepth_vcf(
run=run,
run_folder="{output}/{run}".format(output=transfer_settings["output"], run=run)
)
# To avoid email_state 'vcf_upload_error' to become a 'vcf_upload_warning'
if upload_state != "ok" and email_state != "vcf_upload_error":
email_state = f"vcf_upload_{upload_state}"

send_mail_transfer_state(
filename="{}{}".format(folder["input"], run),
filename="{}{}".format(transfer_settings["input"], run),
state=email_state,
upload_result_gatk=upload_result_gatk,
upload_result_exomedepth=upload_result_exomedepth
)
# Do not include run in transferred_runs.txt if temp error file is not empty.
with open(f"{wkdir}/transferred_runs.txt", 'a', newline='\n') as log_file:
with open(f"{settings.wkdir}/transferred_runs.txt", 'a', newline='\n') as log_file:
log_file_writer = writer(log_file, delimiter='\t')
log_file_writer.writerow([f"{run}_{folder_location_type}", email_state])
return remove_run_file
log_file_writer.writerow([f"{run}_{transfer_settings['name']}", email_state])

return rsync_succes


def run_vcf_upload(vcf_file, vcf_type, run):
Expand Down Expand Up @@ -339,24 +358,38 @@ def upload_exomedepth_vcf(run, run_folder):


if __name__ == "__main__":

"""If daemon is running exit, else create transfer.running file and continue."""
# If daemon is running exit, else create transfer.running file and continue.
run_file = check_daemon_running(settings.wkdir)
remove_run_file = True

"""Check if mount to BGarray intact."""
check_mount(settings.bgarray, run_file)

"""Make set of transferred_runs.txt file, or create transferred_runs.txt if not present."""
# Make set of transferred_runs.txt file, or create transferred_runs.txt if not present.
transferred_set = get_transferred_runs(settings.wkdir)

"""Get folders to be transfer from HPC."""
# Connect to hpc
client, hpc_server = connect_to_remote_server(settings.host_keys, settings.server, settings.user, run_file)
to_be_transferred = get_folders_remote_server(client, settings.folder_dic, run_file, transferred_set)

"""Rsync folders from HPC to bgarray."""
remove_run_file = rsync_server_remote(hpc_server, client, to_be_transferred, run_file)
# Run rsync commands for each mount point.
for mount_name in settings.transfer_settings:
mount_path = settings.transfer_settings[mount_name]['mount_path']
# Check if mount is available and continue
if is_mount_available(mount_name, mount_path, run_file):
ellendejong marked this conversation as resolved.
Show resolved Hide resolved
# Get folders to be transferred
to_be_transferred = get_folders_remote_server(
client,
settings.transfer_settings[mount_name]['transfers'],
run_file,
transferred_set
)

# Rsync folders from HPC to mount
rsync_succes = rsync_server_remote(hpc_server, client, to_be_transferred, mount_path, run_file)
if not rsync_succes:
remove_run_file = False
else: # Mount not available block upcoming transfers
# TODO: Do we want this?
remove_run_file = False

"""Remove run_file if transfer daemon shouldn't be blocked to prevent repeated mailing."""
# Remove run_file if transfer daemon shouldn't be blocked to prevent repeated mailing.
if remove_run_file:
Path(run_file).unlink()

Expand Down
Loading
Loading