Skip to content

Commit

Permalink
resolved merge conflicts
Browse files Browse the repository at this point in the history
  • Loading branch information
fdekievit committed Nov 12, 2024
2 parents 10a44af + 2bd8ac6 commit 53356b6
Show file tree
Hide file tree
Showing 7 changed files with 296 additions and 223 deletions.
211 changes: 122 additions & 89 deletions rsync_to_rdisc.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,37 +33,39 @@ def send_email(subject, template, body_params, attachments=None):
)


def send_mail_lost_mount(run_file):
def send_mail_lost_mount(mount_name, run_file):
hostname = gethostname()
send_email(
f"ERROR: mount lost to BGarray for {hostname}", "lost_mount.html",
{"hostname": hostname, "run_file": run_file},
subject=f"ERROR: mount lost to {mount_name} for {hostname}",
template="lost_mount.html",
body_params={"mount_name": mount_name, "hostname": hostname, "run_file": run_file},
)


def send_mail_lost_hpc(hpc_host, run_file):
send_email(
f"ERROR: Connection to HPC transfernodes {hpc_host} are lost", "lost_hpc.html",
{"filename": hpc_host, "run_file": run_file},
subject=f"ERROR: Connection to HPC transfernodes {hpc_host} are lost",
template="lost_hpc.html",
body_params={"filename": hpc_host, "run_file": run_file},
)


def send_mail_transfer_state(filename, state, upload_result_gatk=None, upload_result_exomedepth=None):
body_params = {"filename": filename}
if state in ["ok", "vcf_upload_error", "vcf_upload_warning"]:
if state == "ok":
subject = f"COMPLETED: Transfer to BGarray has succesfully completed for {filename}"
subject = f"COMPLETED: Transfer has successfully completed for {filename}"
elif state == "vcf_upload_error":
subject = f"ERROR: Transfer to BGarray has completed with VCF upload error for {filename}"
subject = f"ERROR: Transfer has completed with VCF upload error for {filename}"
elif state == "vcf_upload_warning":
subject = f"COMPLETED: Transfer to BGarray has completed with VCF upload warning for {filename}"
subject = f"COMPLETED: Transfer has completed with VCF upload warning for {filename}"
template = "transfer_ok.html"
body_params.update({
"upload_result_gatk": upload_result_gatk,
"upload_result_exomedepth": upload_result_exomedepth
})
elif state == "error":
subject = f"ERROR: Transfer to BGarray has not completed for {filename}"
subject = f"ERROR: Transfer has not completed for {filename}"
template = "transfer_error.html"
send_email(subject, template, body_params)

Expand All @@ -75,36 +77,50 @@ def send_mail_incomplete(run, title_template, subject, run_file):
send_email(subject, f"{title_template}.html", body_params)


def check_rsync(run, folder, temperror, log):
if not Path(temperror).stat().st_size:
msg_bgarray_log = [[""], [">>> No errors detected <<<"]]
Path(temperror).unlink()
def check_rsync(run, transfer_settings):
if not Path(settings.temp_error_path).stat().st_size:
log_msg = [[""], [">>> No errors detected <<<"]]
Path(settings.temp_error_path).unlink() # remove tmperror file.
rsync_result = "ok"
else:
msg_bgarray_log = [
[""], [f">>>{run}_{folder} errors detected in Processed data transfer, not added to completed files <<<"]
log_msg = [
[""],
[f">>>{run}_{transfer_settings['name']} errors detected in data transfer, not added to completed files <<<"]
]
send_mail_transfer_state("{}{}".format(settings.folder_dic[folder]["input"], run), "error")
send_mail_transfer_state("{}{}".format(transfer_settings["input"], run), "error")
rsync_result = "error"
with open(log, 'a', newline='\n') as log_file:

with open(settings.log_path, 'a', newline='\n') as log_file:
log_file_writer = writer(log_file, delimiter='\t')
log_file_writer.writerows(msg_bgarray_log)
log_file_writer.writerows(log_msg)

return rsync_result


def check_daemon_running(wkdir):
run_file = Path(f"{wkdir}/transfer.running")
try:
run_file = Path(f"{wkdir}/transfer.running")
run_file.touch(exist_ok=False)
return run_file
except FileExistsError:
sys.exit()
else:
return run_file


def check_mount(bgarray, run_file):
if not Path(bgarray).exists():
send_mail_lost_mount(run_file)
sys.exit()
def is_mount_available(mount_name, mount_path, run_file):
is_available = True
try:
Path(mount_path).exists()
except (OSError, BlockingIOError):
is_available = False
else:
if not Path(mount_path).exists():
is_available = False

# Send email if mount is not available
if not is_available:
send_mail_lost_mount(mount_name, run_file)
return is_available


def get_transferred_runs(wkdir):
Expand Down Expand Up @@ -132,28 +148,28 @@ def connect_to_remote_server(host_keys, servers, user, run_file):
except OSError:
if hpc_server == servers[-1]:
send_mail_lost_hpc(" and ".join(servers), run_file)
sys.exit("Connection to HPC transfernodes are lost.")
sys.exit("Connection to HPC transfer nodes are lost.")
except (timeout, ssh_exception.SSHException, ssh_exception.AuthenticationException):
if hpc_server == servers[-1]:
Path(run_file).unlink()
sys.exit("HPC connection timeout/SSHException/AuthenticationException")
return client, hpc_server


def get_folders_remote_server(client, folder_dic, run_file, transferred_set):
def get_folders_remote_server(client, transfers, run_file, transferred_set):
to_be_transferred = {}
for folder in folder_dic:
for transfer in transfers:
try:
stdin, stdout, stderr = client.exec_command("ls {}".format(folder_dic[folder]["input"]))
stdin, stdout, stderr = client.exec_command("ls {}".format(transfer["input"]))
except (ConnectionResetError, TimeoutError):
Path(run_file).unlink()
sys.exit("HPC connection ConnectionResetError/TimeoutError")

folders = stdout.read().decode("utf8").split()
for item in folders:
combined = f"{item}_{folder}"
input_folders = stdout.read().decode("utf8").split()
for input_folder in input_folders:
combined = f"{input_folder}_{transfer['name']}"
if combined not in transferred_set:
to_be_transferred[item] = folder
to_be_transferred[input_folder] = transfer

return to_be_transferred

Expand All @@ -172,92 +188,95 @@ def check_if_file_missing(required_files, input_folder, client):
return missing


def action_if_file_missing(folder, remove_run_file, missing, run, folder_location, run_file):
def action_if_file_missing(transfer_settings, rsync_succes, missing, run, run_file):
# Send a mail and lock datatransfer
if not isinstance(folder.get('continue_without_email', None), bool):
reason = ("Unknown status {0}: {1} in settings.py for {2}").format(
'continue_without_email', folder.get('continue_without_email', None), folder)
if not isinstance(transfer_settings.get('continue_without_email', None), bool):
reason = "Unknown status {0}: {1} in settings.py for {2}".format(
'continue_without_email',
transfer_settings.get('continue_without_email', None),
transfer_settings['name']
)
send_mail_incomplete(run, "settings", reason, run_file)
return False
elif 'continue_without_email' in folder and folder["continue_without_email"]:
# Do not send a mail and do not lock datatransfer
return remove_run_file
elif 'continue_without_email' in folder and not folder["continue_without_email"]:
# Send a mail and lock datatransfer
# Do not send a mail and do not lock datatransfer
elif 'continue_without_email' in transfer_settings and transfer_settings["continue_without_email"]:
return rsync_succes
# Send a mail and lock datatransfer
elif 'continue_without_email' in transfer_settings and not transfer_settings["continue_without_email"]:
reason = (
"Analysis not complete (file(s) {0} missing). "
"Run = {1} in folder {2} ".format(" and ".join(missing), run, folder_location))
"Run = {1} in folder {2} ".format(" and ".join(missing), run, transfer_settings["input"]))
send_mail_incomplete(run, "transfer_notcomplete", reason, run_file)
return False


def rsync_server_remote(
hpc_server, client, to_be_transferred, run_file, log=settings.log, bgarray=settings.bgarray, wkdir=settings.wkdir
):
def rsync_server_remote(hpc_server, client, to_be_transferred, mount_path, run_file):
date = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
remove_run_file = True
rsync_succes = True

temperror = settings.temperror
folder_dic = settings.folder_dic
for run in to_be_transferred:
with open(f"{bgarray}/{log}", 'a', newline='\n') as log_file:
with open(settings.log_path, 'a', newline='\n') as log_file:
log_file_writer = writer(log_file, delimiter='\t')
log_file_writer.writerows([["#########"], [f"Date: {date}", f"Run_folder: {run}"]])
folder_location_type = to_be_transferred[run]
# Settings per folder data type, such as remote input dir and local output dir, etc.
folder = folder_dic[folder_location_type]
missing = check_if_file_missing(folder["files_required"], f"{folder['input']}{run}", client)

transfer_settings = to_be_transferred[run]
# settings per folder data type, such as remote input dir and local output dir, etc.
missing = check_if_file_missing(transfer_settings["files_required"], f"{transfer_settings['input']}/{run}", client)

if missing:
remove_run_file = action_if_file_missing(folder, remove_run_file, missing, run, folder_location_type, run_file)
# Don't transfer the run if a required file is missing.
continue
os.system(
(
"rsync -rahuL --stats {user}@{server}:{path}{run} {output}/ "
" 1>> {bgarray}/{log} 2>> {bgarray}/{errorlog} 2> {temperror}"
).format(
print(transfer_settings, rsync_succes, missing, run, run_file)
rsync_succes = action_if_file_missing(transfer_settings, rsync_succes, missing, run, run_file)
continue # don't transfer the run if a required file is missing.
os.system((
"rsync -rahuL --stats {user}@{server}:{input}/{run} {mount_path}/{output}/ "
" 1>> {log_path} 2>> {errorlog_path} 2> {temp_error_path}").format(
user=settings.user,
server=hpc_server,
path=folder["input"],
input=transfer_settings["input"],
run=run,
output=folder["output"],
bgarray=bgarray,
log=log,
errorlog=settings.errorlog,
temperror=temperror
)
)
rsync_result = check_rsync(run, folder_location_type, temperror, f"{bgarray}/{log}")
mount_path=mount_path,
output=transfer_settings["output"],
log_path=settings.log_path,
errorlog_path=settings.errorlog_path,
temp_error_path=settings.temp_error_path
))
rsync_result = check_rsync(run, transfer_settings)

if rsync_result == "ok":
upload_result_gatk = None
upload_result_exomedepth = None
email_state = rsync_result

if folder['upload_gatk_vcf']:
upload_state, upload_result_gatk = upload_gatk_vcf(run, f"{folder['output']}/{run}")
if transfer_settings['upload_gatk_vcf']:
upload_state, upload_result_gatk = upload_gatk_vcf(
run=run,
run_folder="{output}/{run}".format(output=transfer_settings["output"], run=run)
)
if upload_state != "ok":
# Warning or error
email_state = f"vcf_upload_{upload_state}"

if folder['upload_exomedepth_vcf']:
upload_state, upload_result_exomedepth = upload_exomedepth_vcf(run, f"{folder['output']}/{run}")
if transfer_settings['upload_exomedepth_vcf']:
upload_state, upload_result_exomedepth = upload_exomedepth_vcf(
run=run,
run_folder="{output}/{run}".format(output=transfer_settings["output"], run=run)
)
# To avoid email_state 'vcf_upload_error' to become a 'vcf_upload_warning'
if upload_state != "ok" and email_state != "vcf_upload_error":
email_state = f"vcf_upload_{upload_state}"

send_mail_transfer_state(
filename="{}{}".format(folder["input"], run),
filename="{}{}".format(transfer_settings["input"], run),
state=email_state,
upload_result_gatk=upload_result_gatk,
upload_result_exomedepth=upload_result_exomedepth
)
# Do not include run in transferred_runs.txt if temp error file is not empty.
with open(f"{wkdir}/transferred_runs.txt", 'a', newline='\n') as log_file:
with open(f"{settings.wkdir}/transferred_runs.txt", 'a', newline='\n') as log_file:
log_file_writer = writer(log_file, delimiter='\t')
log_file_writer.writerow([f"{run}_{folder_location_type}", email_state])
return remove_run_file
log_file_writer.writerow([f"{run}_{transfer_settings['name']}", email_state])

return rsync_succes


def run_vcf_upload(vcf_file, vcf_type, run):
Expand Down Expand Up @@ -340,24 +359,38 @@ def upload_exomedepth_vcf(run, run_folder):


if __name__ == "__main__":

"""If daemon is running exit, else create transfer.running file and continue."""
# If daemon is running exit, else create transfer.running file and continue.
run_file = check_daemon_running(settings.wkdir)
remove_run_file = True

"""Check if mount to BGarray intact."""
check_mount(settings.bgarray, run_file)

"""Make set of transferred_runs.txt file, or create transferred_runs.txt if not present."""
# Make set of transferred_runs.txt file, or create transferred_runs.txt if not present.
transferred_set = get_transferred_runs(settings.wkdir)

"""Get folders to be transfer from HPC."""
# Connect to hpc
client, hpc_server = connect_to_remote_server(settings.host_keys, settings.server, settings.user, run_file)
to_be_transferred = get_folders_remote_server(client, settings.folder_dic, run_file, transferred_set)

"""Rsync folders from HPC to bgarray."""
remove_run_file = rsync_server_remote(hpc_server, client, to_be_transferred, run_file)
# Run rsync commands for each mount point.
for mount_name in settings.transfer_settings:
mount_path = settings.transfer_settings[mount_name]['mount_path']
# Check if mount is available and continue
if is_mount_available(mount_name, mount_path, run_file):
# Get folders to be transferred
to_be_transferred = get_folders_remote_server(
client,
settings.transfer_settings[mount_name]['transfers'],
run_file,
transferred_set
)

# Rsync folders from HPC to mount
rsync_succes = rsync_server_remote(hpc_server, client, to_be_transferred, mount_path, run_file)
if not rsync_succes:
remove_run_file = False
else: # Mount not available block upcoming transfers
# TODO: Do we want this?
remove_run_file = False

"""Remove run_file if transfer daemon shouldn't be blocked to prevent repeated mailing."""
# Remove run_file if transfer daemon shouldn't be blocked to prevent repeated mailing.
if remove_run_file:
Path(run_file).unlink()

Expand Down
Loading

0 comments on commit 53356b6

Please sign in to comment.