diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 1d1d82d..6583953 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -17,6 +17,14 @@ repos: pass_filenames: false additional_dependencies: - epi2melabs + - id: build_models + name: build_models + entry: datamodel-codegen --strict-nullable --base-class workflow_glue.results_schema_helpers.BaseModel --use-schema-description --disable-timestamp --input results_schema.yml --input-file-type openapi --output bin/workflow_glue/results_schema.py + language: python + files: 'results_schema.yml' + pass_filenames: false + additional_dependencies: + - datamodel-code-generator - repo: https://github.com/pycqa/flake8 rev: 5.0.4 hooks: @@ -37,4 +45,5 @@ repos: "--import-order-style=google", "--statistics", "--max-line-length=88", + "--extend-exclude=bin/workflow_glue/results_schema.py", ] diff --git a/bin/workflow_glue/check_bam_headers_in_dir.py b/bin/workflow_glue/check_bam_headers_in_dir.py new file mode 100755 index 0000000..d582de1 --- /dev/null +++ b/bin/workflow_glue/check_bam_headers_in_dir.py @@ -0,0 +1,58 @@ +"""Check (u)BAM files for `@SQ` lines whether they are the same in all headers.""" + +from pathlib import Path +import sys + +import pysam + +from .util import get_named_logger, wf_parser # noqa: ABS101 + + +def get_sq_lines(xam_file): + """Extract the `@SQ` lines from the header of a XAM file.""" + return pysam.AlignmentFile(xam_file, check_sq=False).header["SQ"] + + +def main(args): + """Run the entry point.""" + logger = get_named_logger("checkBamHdr") + + if not args.input_path.is_dir(): + raise ValueError(f"Input path '{args.input_path}' must be a directory.") + + target_files = list(args.input_path.glob("*")) + if not target_files: + raise ValueError(f"No files found in input directory '{args.input_path}'.") + # Loop over target files and check if there are `@SQ` lines in all headers or not. + # Set `is_unaligned` accordingly. If there are mixed headers (either with some files + # containing `@SQ` lines and some not or with different files containing different + # `@SQ` lines), set `mixed_headers` to `True`. + first_sq_lines = None + mixed_headers = False + for xam_file in target_files: + sq_lines = get_sq_lines(xam_file) + if first_sq_lines is None: + # this is the first file + first_sq_lines = sq_lines + else: + # this is a subsequent file; check with the first `@SQ` lines + if sq_lines != first_sq_lines: + mixed_headers = True + break + + # we set `is_unaligned` to `True` if there were no mixed headers and the last file + # didn't have `@SQ` lines (as we can then be sure that none of the files did) + is_unaligned = not mixed_headers and not sq_lines + # write `is_unaligned` and `mixed_headers` out so that they can be set as env. + # variables + sys.stdout.write( + f"IS_UNALIGNED={int(is_unaligned)};MIXED_HEADERS={int(mixed_headers)}" + ) + logger.info(f"Checked (u)BAM headers in '{args.input_path}'.") + + +def argparser(): + """Argument parser for entrypoint.""" + parser = wf_parser("check_bam_headers") + parser.add_argument("input_path", type=Path, help="Path to target directory") + return parser diff --git a/bin/workflow_glue/report.py b/bin/workflow_glue/report.py index 6df81ac..3ed42bd 100755 --- a/bin/workflow_glue/report.py +++ b/bin/workflow_glue/report.py @@ -3,13 +3,13 @@ import json -from aplanat import annot, bars, hist, lines, report +from aplanat import bars, lines, report from aplanat.components import bcfstats, nextclade from aplanat.components import simple as scomponents +from aplanat.components.fastcat import read_length_plot, read_quality_plot from aplanat.util import Colors from bokeh.layouts import gridplot, layout from bokeh.models import Panel, Range1d, Tabs -import numpy as np import pandas as pd from pandas.api import types as pd_types import pysam @@ -28,7 +28,7 @@ def read_files(summaries, **kwargs): return pd.concat(dfs) -def output_json(df, consensus_fasta, fastcat_stats): +def output_json(df, consensus_fasta, readcounts): """Read depth stats df and create JSON output.""" grouped_by_sample = df.groupby('sample_name') all_json = {} @@ -51,12 +51,6 @@ def output_json(df, consensus_fasta, fastcat_stats): newdf[x].values.tolist() for x in newdf.columns))) all_json[sample] = final final_json = {'data': []} - seq_summary = pd.read_csv( - fastcat_stats, - delimiter="\t", - dtype={"sample_name": CATEGORICAL} - ) - readcounts = seq_summary['sample_name'].value_counts().to_dict() # parse the consensus fasta to get extra info required with pysam.FastxFile(consensus_fasta) as fh: for entry in fh: @@ -101,61 +95,38 @@ def main(args): This section displays basic QC metrics indicating read data quality. ''') # read length summary - seq_summary = pd.read_csv( - args.fastcat_stats, - delimiter="\t", - dtype={"sample_name": CATEGORICAL} + tabs = {} + sample_readcounts = {} + sample_goodreadcounts = {} + for summary_fn in args.fastcat_stats: + df_sample = pd.read_csv(summary_fn, sep="\t") + sample_id = df_sample['sample_name'].iloc[0] + rlp = read_length_plot( + df_sample, + min_len=args.min_len, + max_len=args.max_len, + xlim=(0, 2000), ) - total_bases = seq_summary['read_length'].sum() - mean_length = total_bases / len(seq_summary) - median_length = np.median(seq_summary['read_length']) - datas = [seq_summary['read_length']] - length_hist = hist.histogram( - datas, colors=[Colors.cerulean], binwidth=50, - title="Read length distribution.", - x_axis_label='Read Length / bases', - y_axis_label='Number of reads', - xlim=(0, 2000)) - length_hist = annot.marker_vline( - length_hist, args.min_len, - label="Min: {}".format(args.min_len), text_baseline='bottom', - color='grey') - length_hist = annot.marker_vline( - length_hist, args.max_len, - label="Max: {}".format(args.max_len), text_baseline='top') - length_hist = annot.subtitle( - length_hist, - "Mean: {:.0f}. Median: {:.0f}".format( - mean_length, median_length)) - - datas = [seq_summary['mean_quality']] - mean_q, median_q = np.mean(datas[0]), np.median(datas[0]) - q_hist = hist.histogram( - datas, colors=[Colors.cerulean], bins=100, - title="Read quality score", - x_axis_label="Quality score", - y_axis_label="Number of reads", - xlim=(4, 25)) - q_hist = annot.subtitle( - q_hist, - "Mean: {:.0f}. Median: {:.0f}".format( - mean_q, median_q)) + rqp = read_quality_plot(df_sample) + grid = gridplot( + [rlp, rqp], ncols=2, sizing_mode="stretch_width") + tabs[sample_id] = Panel(child=grid, title=sample_id) + # count total reads for output_json + sample_readcounts[sample_id] = df_sample.shape[0] + # count "good reads" for additional bar plot + good_reads = df_sample.loc[ + (df_sample['read_length'] > args.min_len) + & (df_sample['read_length'] < args.max_len) + ] + sample_goodreadcounts[sample_id] = good_reads.shape[0] + + barcode_keys = sorted(sample_goodreadcounts) # barcode count plot - good_reads = seq_summary.loc[ - (seq_summary['read_length'] > args.min_len) - & (seq_summary['read_length'] < args.max_len)] - barcode_counts = ( - pd.DataFrame(good_reads['sample_name'].value_counts()) - .sort_index() - .reset_index() - .rename( - columns={'index': 'sample', 'sample_name': 'count'}) - ) - bc_counts = bars.simple_bar( - barcode_counts['sample'].astype(str), barcode_counts['count'], - colors=[Colors.cerulean]*len(barcode_counts), + barcode_keys, + [sample_goodreadcounts.get(x) for x in barcode_keys], + colors=[Colors.cerulean]*len(sample_goodreadcounts), title=( 'Number of reads per barcode ' '(filtered by {} < length < {})'.format( @@ -163,10 +134,19 @@ def main(args): plot_width=None ) bc_counts.xaxis.major_label_orientation = 3.14/2 + + section = report_doc.add_section() + section.markdown(""" + ### Sequence summaries""") section.plot( layout( - [[length_hist, q_hist], [bc_counts]], - sizing_mode="stretch_width")) + [ + Tabs(tabs=[tabs.get(x) for x in barcode_keys]), + [bc_counts], + ], + sizing_mode="stretch_width" + ) + ) section = report_doc.add_section() section.markdown(""" @@ -215,7 +195,7 @@ def main(args): # depth summary by amplicon pool df = read_files( args.depths, sep="\t", converters={'sample_name': str}) - epi2me_json = output_json(df, args.consensus_fasta, args.fastcat_stats) + epi2me_json = output_json(df, args.consensus_fasta, sample_readcounts) json_object = json.dumps(epi2me_json, indent=4, separators=(',', ':')) json_file = open("artic.json", "a") json_file.write(json_object) @@ -378,7 +358,7 @@ def argparser(): "--depths", nargs='+', required=True, help="Depth summary files") parser.add_argument( - "--fastcat_stats", required=True, + "--fastcat_stats", nargs='+', required=True, help="fastcat summary file") parser.add_argument( "--nextclade", diff --git a/lib/Pinguscript.groovy b/lib/Pinguscript.groovy index a893263..4b91236 100644 --- a/lib/Pinguscript.groovy +++ b/lib/Pinguscript.groovy @@ -4,49 +4,154 @@ import groovy.json.JsonSlurper class Pinguscript { - public static String ping_post(workflow, message, error_message, out_dir, params) { - def msgId = UUID.randomUUID().toString() - def hosthash = null + + // Send a ping for the start of a workflow + public static void ping_start(nextflow, workflow, params) { + wf_ping(nextflow, workflow, "start", null, params) + } + // Send a ping for a completed workflow (successful or otherwise) + public static void ping_complete(nextflow, workflow, params) { + wf_ping(nextflow, workflow, "end", null, params) + } + // Send a ping for a workflow error + public static void ping_error(nextflow, workflow, params) { + def error_message = workflow.errorMessage + wf_ping(nextflow, workflow, "error", error_message, params) + } + // Shared handler to construct a ping JSON and send it + private static String wf_ping(nextflow, workflow, event, error_message, params) { + if (params.disable_ping) { + return "{}" + } + def body_json = make_wf_ping(nextflow, workflow, event, error_message, params) + send_ping_post("epilaby", body_json) + } + + // Helper to removing keys from a map + private static clean_meta(meta, keys_to_remove) { + for (key in keys_to_remove) { + if (meta.containsKey(key)) { + meta.remove(key) + } + } + } + + // Helper for fetching a key from the params map + // seems pointless but you just know someone is going to end up writing meta.this ? meta.that + private static get_meta(meta, key) { + (meta.containsKey(key) && meta[key]) ? meta[key].toString() : null + } + + // Construct workflow ping JSON + private static String make_wf_ping(nextflow, workflow, event, error_message, params) { + // cheeky deepcopy using json + String paramsJSON = new JsonBuilder(params).toPrettyString() + def params_data = new JsonSlurper().parseText(paramsJSON) + + // hostname + def host = null try { - hosthash = InetAddress.getLocalHost().getHostName() - } catch(Exception e) { - hosthash = "Unavailable" + host = InetAddress.getLocalHost().getHostName() } + catch(Exception e) {} + + // OS + // TODO check version on WSL def opsys = System.properties['os.name'].toLowerCase() - if (System.properties['os.version'].toLowerCase().contains("wsl")){ - opsys = "WSL" + def opver = System.properties['os.version'] + if (opver.toLowerCase().contains("wsl")){ + opsys = "wsl" + } + + // placeholder for any future okta business + // for now we'll use the guest_ sent to wf.epi2me_user + def user = get_meta(params.wf, "epi2me_user") + + // drop cruft to save some precious bytes + // affects the deep copy rather than original params + clean_meta(params_data, [ + "schema_ignore_params", + ]) + def ingress_ids = [] + if (params_data.containsKey("wf")) { + ingress_ids = params_data.wf["ingress.run_ids"] ?: [] + clean_meta(params_data.wf, [ + "agent", // we send this later + "epi2me_instance", // we send this later + "epi2me_user", // we send this later + "example_cmd", + "ingress.run_ids", // we will send this elsewhere + ]) + } + + // try and get runtime information + def cpus = null + try { + cpus = Runtime.getRuntime().availableProcessors() } - def workflow_name = "$workflow.manifest.name" - def session = "$workflow.sessionId" - def errorMessage = "$error_message" - def profile = "$workflow.profile" - def filename = "$out_dir/params.json" - File fileb = new File(filename) - def any_other_data = [:] - if (fileb.exists() && "$message" != "start") { - def jsonSlurper = new JsonSlurper() - any_other_data = jsonSlurper.parse(fileb) + catch(Exception e) {} + + def workflow_success = null + def workflow_exitcode = null + if (event != "start") { + workflow_success = workflow.success + workflow_exitcode = workflow.exitStatus } - def meta_json = new JsonBuilder() - def agent = "$params.wf.agent" - def meta = meta_json "error": errorMessage.toString(), "profile": profile.toString(), - "agent": agent.toString() - meta+=any_other_data - def ping_version = '2.0.2' - def tracking_json = new JsonBuilder() - def tracking_id = tracking_json "msg_id": msgId, "version": ping_version - def data_json = new JsonBuilder() - def data = data_json "workflow": workflow_name.toString(), - "message": message, "meta": meta + + /// build message def body_json = new JsonBuilder() - def root = body_json "tracking_id": tracking_id, "hostname": hosthash.toString(), "os": opsys.toString(), - "session": session.toString(), "data": data, "source": "workflow" + body_json \ + "tracking_id": [ + "msg_id": UUID.randomUUID().toString(), + "version": "3.0.0" + ], + "source": "workflow", + "event": event, + "params": params_data, + // data will be null on start events, as ingress has not run + "data": event != "start" ? [run_ids: ingress_ids] : null, + "workflow": [ + "name": workflow.manifest.name, + "version": workflow.manifest.version, // could use NfcoreTemplate.version(workflow) + "run_name": workflow.runName, // required to disambiguate sessions + "session": workflow.sessionId, + "profile": workflow.profile, + "resume": workflow.resume, + "error": error_message, // null if no error + "success": workflow_success, + "exitcode": workflow_exitcode, + ], + "env": [ + "user": user, // placeholder for any future okta + "hostname": host, + "os": [ + "name": opsys, + "version": opver + ], + "resource": [ + "cpus": cpus, + "memory": null, // placeholder, no point asking via Runtime as it will just give us the Xmx size + ], + "agent": get_meta(params.wf, "agent"), // access via original params + "epi2me": [ + "instance": get_meta(params.wf, "epi2me_instance"), + "user": user, + ], + "nextflow": [ + "version": nextflow.version.toString(), + "version_compat": nextflow.version.matches(workflow.manifest.nextflowVersion) + ] + ] + return body_json + } + // Send a JSON payload to a given endpoint + private static String send_ping_post(endpoint, body_json) { // Attempt to send payload and absorb any possible Exception gracefully String postResult boolean raise_exception = false try { - ((HttpURLConnection)new URL('https://ping.oxfordnanoportal.com/epilaby').openConnection()).with({ + ((HttpURLConnection)new URL("https://ping.oxfordnanoportal.com/${endpoint}").openConnection()).with({ requestMethod = 'POST' doOutput = true setConnectTimeout(5000) @@ -64,7 +169,8 @@ class Pinguscript { // Accessing inputStream.text will raise an Exception for failed requests postResult = inputStream.text }) - } catch(Exception e) { + } + catch(Exception e) { if(raise_exception) { throw e } } return (postResult) diff --git a/lib/fastqingress.nf b/lib/fastqingress.nf deleted file mode 100644 index a353996..0000000 --- a/lib/fastqingress.nf +++ /dev/null @@ -1,462 +0,0 @@ -import java.nio.file.NoSuchFileException - -import ArgumentParser - -EXTENSIONS = ["fastq", "fastq.gz", "fq", "fq.gz"] - -/** - * Take a map of input arguments, find valid inputs, and return a channel - * with elements of `[metamap, seqs.fastq.gz, path-to-fastcat-stats]`. - * The last item is `null` if `fastcat` was not run. It is only run on directories - * containing more than one FASTQ file or when `fastcat_stats: true`. - * - * @param arguments: map with arguments containing - * - "input": path to either: (i) input FASTQ file, (ii) top-level directory containing - * FASTQ files, (iii) directory containing sub-directories which contain FASTQ - * files - * - "sample": string to name single sample - * - "sample_sheet": path to CSV sample sheet - * - "analyse_unclassified": boolean whether to keep unclassified reads - * - "fastcat_stats": boolean whether to write the `fastcat` stats - * @return Channel of `[Map(alias, barcode, type, ...), Path, Path|null]`. - * The first element is a map with metadata, the second is the path to the - * `.fastq.gz` file with the (potentially concatenated) sequences and the third is - * the path to the directory with the fastcat statistics (or `null` if `fastcat` - * wasn't run). - */ -def fastq_ingress(Map arguments) -{ - // check arguments - Map margs = parse_arguments(arguments) - // define the channel for holding the inputs [metamap, input_path]. It will be - // either filled by `watchPath` (only emitting files) or by the data of the three - // input types (single file or dir with fastq or subdirs with fastq). - def ch_input - // handle `watchPath` case - if (margs["watch_path"]) { - ch_input = watch_path(margs) - } else { - // create a channel with the inputs (single file / dir with fastq / subdirs - // with fastq) - ch_input = get_valid_inputs(margs) - } - // `ch_input` might contain elements of `[metamap, null]` if there were entries in - // the sample sheet for which no FASTQ files were found. We put these into an extra - // channel and combine with the result channel before returning. - ch_input = ch_input.branch { meta, path -> - reads_found: path as boolean - no_reads_found: true - } - def ch_result - if (margs.fastcat_stats) { - // run fastcat regardless of input type - ch_result = fastcat(ch_input.reads_found, margs["fastcat_extra_args"]).map { - meta, reads, stats -> - // extract run_ids parsed by fastcat into metadata - ArrayList run_ids = stats.resolve("run_ids").splitText().collect { - it.strip() - } - // `meta + [...]` returns a new map which is handy to avoid any - // modifying-maps-in-closures weirdness - // See https://github.com/nextflow-io/nextflow/issues/2660 - [meta + [run_ids: run_ids], reads, stats] - } - } else { - // the fastcat stats were not requested --> run fastcat only on directories with - // more than one FASTQ file (and not on single files or directories with a - // single file) - def ch_branched = ch_input.reads_found.map {meta, path -> - // find directories with only a single FASTQ file and "unwrap" the file - if (path.isDirectory()) { - List fq_files = get_fq_files_in_dir(path) - if (fq_files.size() == 1) { - path = fq_files[0] - } - } - [meta, path] - } .branch { meta, path -> - // now there can only be two cases: - // (i) single FASTQ file (pass to `move_or_compress` later) - // (ii) dir with multiple fastq files (pass to `fastcat` later) - single_file: path.isFile() - dir_with_fastq_files: true - } - // call the respective processes on both branches and return - ch_result = fastcat( - ch_branched.dir_with_fastq_files, margs["fastcat_extra_args"] - ).concat( - ch_branched.single_file | move_or_compress | map { - meta, path -> [meta, path, null] - } - ) - } - return ch_result.concat(ch_input.no_reads_found.map { [*it, null] }) -} - - -/** - * Run `watchPath` on the input directory and return a channel [metamap, path-to-fastq]. - * The meta data is taken from the sample sheet in case one was provided. Otherwise it - * only contains the `alias` (either `margs["sample"]` or the name of the parent - * directory of the file). - * - * @param margs: map with parsed input arguments - * @return: Channel of [metamap, path-to-fastq] - */ -def watch_path(Map margs) { - // we have two cases to consider: (i) files being generated in the top-level - // directory and (ii) files being generated in sub-directories. If we find files of - // both kinds, throw an error. - Path input - try { - input = file(margs.input, checkIfExists: true) - } catch (NoSuchFileException e) { - error "Input path $margs.input does not exist." - } - if (input.isFile()) { - error "Input ($input) must be a directory when using `watch_path`." - } - // get existing FASTQ files first (look for relevant files in the top-level dir and - // all sub-dirs) - def ch_existing_input = Channel.fromPath(input) - | concat(Channel.fromPath("$input/*", type: 'dir')) - | map { get_fq_files_in_dir(it) } - | flatten - // now get channel with files found by `watchPath` - def ch_watched = Channel.watchPath("$input/**").until { it.name.startsWith('STOP') } - // only keep FASTQ files - | filter { - for (ext in EXTENSIONS) { - if (it.name.endsWith(ext)) return true - } - return false - } - // merge the channels - ch_watched = ch_existing_input | concat(ch_watched) - // check if input is as expected; start by throwing an error when finding files in - // top-level dir and sub-directories - String prev_input_type - ch_watched - | map { - String input_type = (it.parent == input) ? "top-level" : "sub-dir" - if (prev_input_type && (input_type != prev_input_type)) { - error "`watchPath` found FASTQ files in the top-level directory " + - "as well as in sub-directories." - } - // if file is in a sub-dir, make sure it's not a sub-sub-dir - if ((input_type == "sub-dir") && (it.parent.parent != input)) { - error "`watchPath` found a FASTQ file more than one level of " + - "sub-directories deep ('$it')." - } - // we also don't want files in the top-level dir when we got a sample sheet - if ((input_type == "top-level") && margs["sample_sheet"]) { - error "`watchPath` found files in top-level directory even though a " + - "sample sheet was provided ('${margs["sample_sheet"]}')." - } - prev_input_type = input_type - } - if (margs.sample_sheet) { - // add metadata from sample sheet (we can't use join here since it does not work - // with repeated keys; we therefore need to transform the sample sheet data into - // a map with the barcodes as keys) - def ch_sample_sheet = get_sample_sheet(file(margs.sample_sheet), margs.required_sample_types) - | collect - | map { it.collectEntries { [(it["barcode"]): it] } } - // now we can use this channel to annotate all files with the corresponding info - // from the sample sheet - ch_watched = ch_watched - | combine(ch_sample_sheet) - | map { file_path, sample_sheet_map -> - String barcode = file_path.parent.name - Map meta = sample_sheet_map[barcode] - // throw error if the barcode was not in the sample sheet - if (!meta) { - error "Sub-directory $barcode was not found in the sample sheet." - } - [meta, file_path] - } - } else { - ch_watched = ch_watched - | map { - // This file could be in the top-level dir or a sub-dir. In the first case - // check if a sample name was provided. In the second case, the alias is - // always the name of the sub-dir. - String alias - if (it.parent == input) { - // top-level dir - alias = margs["sample"] ?: it.parent.name - } else { - // sub-dir - alias = it.parent.name - } - [create_metamap([alias: alias]), it] - } - } - return ch_watched -} - - -process move_or_compress { - label "fastq_ingress" - label "wf_common" - cpus 1 - input: - // don't stage `input` with a literal because we check the file extension - tuple val(meta), path(input) - output: - tuple val(meta), path("seqs.fastq.gz") - script: - String out = "seqs.fastq.gz" - if (input.name.endsWith('.gz')) { - // we need to take into account that the file could already be named - // "seqs.fastq.gz" in which case `mv` would fail - """ - [ "$input" == "$out" ] || mv "$input" $out - """ - } else { - """ - cat "$input" | bgzip -@ $task.cpus > $out - """ - } -} - - -process fastcat { - label "fastq_ingress" - label "wf_common" - cpus 3 - input: - tuple val(meta), path("input") - val extra_args - output: - tuple val(meta), path("seqs.fastq.gz"), path("fastcat_stats") - script: - String out = "seqs.fastq.gz" - String fastcat_stats_outdir = "fastcat_stats" - """ - mkdir $fastcat_stats_outdir - fastcat \ - -s ${meta["alias"]} \ - -r $fastcat_stats_outdir/per-read-stats.tsv \ - -f $fastcat_stats_outdir/per-file-stats.tsv \ - $extra_args \ - input \ - | bgzip -@ $task.cpus > $out - csvtk cut -tf runid $fastcat_stats_outdir/per-read-stats.tsv | csvtk del-header | sort | uniq > $fastcat_stats_outdir/run_ids - """ -} - - -/** - * Parse input arguments for `fastq_ingress`. - * - * @param arguments: map with input arguments (see `fastq_ingress` for details) - * @return: map of parsed arguments - */ -Map parse_arguments(Map arguments) { - ArgumentParser parser = new ArgumentParser( - args:["input"], - kwargs:["sample": null, - "sample_sheet": null, - "analyse_unclassified": false, - "fastcat_stats": false, - "fastcat_extra_args": "", - "required_sample_types": [], - "watch_path": false], - name: "fastq_ingress") - return parser.parse_args(arguments) -} - - -/** - * Find valid inputs based on the input type. - * - * @param margs: parsed arguments (see `fastq_ingress` for details) - * @return: channel of `[metamap, input-path]`; `input-path` can be the path to - * a single FASTQ file or to a directory containing FASTQ files - */ -def get_valid_inputs(Map margs){ - log.info "Checking fastq input." - Path input - try { - input = file(margs.input, checkIfExists: true) - } catch (NoSuchFileException e) { - error "Input path $margs.input does not exist." - } - // declare resulting input channel and other variables needed in the outer scope - def ch_input - ArrayList sub_dirs_with_fastq_files - // handle case of `input` being a single file - if (input.isFile()) { - // the `fastcat` process can deal with directories or single file inputs - ch_input = Channel.of( - [create_metamap([alias: margs["sample"] ?: input.simpleName]), input]) - } else if (input.isDirectory()) { - // input is a directory --> we accept two cases: (i) a top-level directory with - // fastq files and no sub-directories or (ii) a directory with one layer of - // sub-directories containing fastq files - boolean dir_has_fastq_files = get_fq_files_in_dir(input) - // find potential sub-directories (and sub-dirs with FASTQ files; note that - // these lists can be empty) - ArrayList sub_dirs = file(input.resolve('*'), type: "dir") - sub_dirs_with_fastq_files = sub_dirs.findAll { get_fq_files_in_dir(it) } - // deal with first case (top-lvl dir with FASTQ files and no sub-directories - // containing FASTQ files) - if (dir_has_fastq_files) { - if (sub_dirs_with_fastq_files) { - error "Input directory '$input' cannot contain FASTQ " + - "files and sub-directories with FASTQ files." - } - ch_input = Channel.of( - [create_metamap([alias: margs["sample"] ?: input.baseName]), input]) - } else { - // deal with the second case (sub-directories with fastq data) --> first - // check whether we actually found sub-directories (and remove - // sub-directories called 'unclassified' unless otherwise specified) - if (!margs.analyse_unclassified) { - sub_dirs_with_fastq_files = sub_dirs_with_fastq_files.findAll { - it.baseName != "unclassified" - } - } - if (!sub_dirs_with_fastq_files) { - error "Input directory '$input' must contain either FASTQ files " + - "or sub-directories containing FASTQ files." - } - // make sure that there are no sub-sub-directories with FASTQ files and that - // the sub-directories actually contain fastq files) - if (sub_dirs.any { - ArrayList subsubdirs = file(it.resolve('*'), type: "dir") - subsubdirs.any { get_fq_files_in_dir(it) } - }) { - error "Input directory '$input' cannot contain more " + - "than one level of sub-directories with FASTQ files." - } - // filter based on sample sheet in case one was provided - if (margs.sample_sheet) { - // get channel of entries in the sample sheet - def ch_sample_sheet = get_sample_sheet(file(margs.sample_sheet), margs.required_sample_types) - // get the union of both channels (missing values will be replaced with - // `null`) - def ch_union = Channel.fromPath(sub_dirs_with_fastq_files).map { - [it.baseName, it] - }.join(ch_sample_sheet.map{[it.barcode, it]}, remainder: true) - // after joining the channels, there are three possible cases: - // (i) valid input path and sample sheet entry are both present - // (ii) there is a sample sheet entry but no corresponding input dir - // --> we'll emit `[metamap-from-sample-sheet-entry, null]` - // (iii) there is a valid path, but the sample sheet entry is missing - // --> drop this entry and print a warning to the log - ch_input = ch_union.map {barcode, path, sample_sheet_entry -> - if (sample_sheet_entry) { - [create_metamap(sample_sheet_entry), path] - } else { - log.warn "Input directory '$barcode' was found, but sample " + - "sheet '$margs.sample_sheet' has no such entry." - } - } - } else { - ch_input = Channel.fromPath(sub_dirs_with_fastq_files).map { - [create_metamap([alias: it.baseName, barcode: it.baseName]), it] - } - } - } - } else { - error "Input $input appears to be neither a file nor a directory." - } - // a sample sheet only makes sense in the case of a directory with - // sub-directories - if (margs.sample_sheet && !sub_dirs_with_fastq_files) { - error "Sample sheet was provided, but input does not contain " + - "sub-directories with FASTQ files." - } - return ch_input -} - - -/** - * Create a map that contains at least these keys: `[alias, barcode, type]`. - * `alias` is required, `barcode` and `type` are filled with default values if - * missing. Additional entries are allowed. - * - * @param kwargs: map with input parameters; must contain `alias` - * @return: map(alias, barcode, type, ...) - */ -Map create_metamap(Map arguments) { - ArgumentParser parser = new ArgumentParser( - args: ["alias"], - kwargs: [ - "barcode": null, - "type": "test_sample", - "run_ids": [], - ], - name: "create_metamap", - ) - def metamap = parser.parse_known_args(arguments) - metamap['alias'] = metamap['alias'].replaceAll(" ","_") - return metamap -} - - -/** - * Get the fastq files in the directory (non-recursive). - * - * @param dir: path to the target directory - * @return: list of found fastq files - */ -ArrayList get_fq_files_in_dir(Path dir) { - return EXTENSIONS.collect { file(dir.resolve("*.$it"), type: "file") } .flatten() -} - - -/** - * Check the sample sheet and return a channel with its rows if it is valid. - * - * @param sample_sheet: path to the sample sheet CSV - * @return: channel of maps (with values in sample sheet header as keys) - */ -def get_sample_sheet(Path sample_sheet, ArrayList required_sample_types) { - // If `validate_sample_sheet` does not return an error message, we can assume that - // the sample sheet is valid and parse it. However, because of Nextflow's - // asynchronous magic, we might emit values from `.splitCSV()` before the - // error-checking closure finishes. This is no big deal, but undesired nonetheless - // as the error message might be overwritten by the traces of new nextflow processes - // in STDOUT. Thus, we use the somewhat clunky construct with `concat` and `last` - // below. This lets the CSV channel only start to emit once the error checking is - // done. - ch_err = validate_sample_sheet(sample_sheet, required_sample_types).map { - // check if there was an error message - if (it) error "Invalid sample sheet: ${it}." - it - } - // concat the channel holding the path to the sample sheet to `ch_err` and call - // `.last()` to make sure that the error-checking closure above executes before - // emitting values from the CSV - return ch_err.concat(Channel.fromPath(sample_sheet)).last().splitCsv( - header: true, quote: '"' - ) -} - - -/** - * Python script for validating a sample sheet. The script will write messages - * to STDOUT if the sample sheet is invalid. In case there are no issues, no - * message is emitted. - * - * @param: path to sample sheet CSV - * @param: list of required sample types (optional) - * @return: string (optional) - */ -process validate_sample_sheet { - label "fastq_ingress" - label "wf_common" - input: - path "sample_sheet.csv" - val required_sample_types - output: stdout - script: - String req_types_arg = required_sample_types ? "--required_sample_types "+required_sample_types.join(" ") : "" - """ - workflow-glue check_sample_sheet sample_sheet.csv $req_types_arg - """ -} - diff --git a/lib/ingress.nf b/lib/ingress.nf new file mode 100644 index 0000000..5839730 --- /dev/null +++ b/lib/ingress.nf @@ -0,0 +1,749 @@ +import java.nio.file.NoSuchFileException + +import ArgumentParser + +enum InputType { + SingleFile, + TopLevelDir, + DirWithSubDirs, +} + +N_OPEN_FILES_LIMIT = 128 + + +/** +* Check if a file ends with one of the target extensions. +* +* @param file: path to the file in question +* @param extensions: list of valid file extensions +* @return: boolean whether the file has one of the provided extensions +*/ +def is_target_file(Path file, List extensions) { + extensions.any { ext -> file.name.endsWith(ext) } +} + + +/** + * Take a channel of the shape `[meta, reads, path-to-stats-dir | null]` and extract the + * run IDs from the `run_ids` file in the stats directory into the metamap. If the path + * to the stats dir is `null`, add an empty list. + * + * @param ch: input channel of shape `[meta, reads, path-to-stats-dir | null]` + * @return: channel with a list of run IDs added to the metamap + */ +def add_run_IDs_to_meta(ch) { + // HashSet for all observed run_ids + Set ingressed_run_ids = new HashSet() + + // extract run_ids from fastcat stats / bamstats results and add to metadata as well + // as `ingressed_run_ids` + ch = ch | map { meta, reads, stats -> + ArrayList run_ids = [] + if (stats) { + run_ids = stats.resolve("run_ids").splitText().collect { it.strip() } + ingressed_run_ids += run_ids + } + // `meta + [...]` returns a new map which is handy to avoid any + // modifying-maps-in-closures weirdness + // See https://github.com/nextflow-io/nextflow/issues/2660 + [meta + [run_ids: run_ids], reads, stats] + } + // put run_ids somewhere global for trivial access later + // bit grim but decouples ingress metadata from workflow main.nf + // additionally no need to use CWUtil as we're not overriding any user params + ch | subscribe(onComplete: { + params.wf["ingress.run_ids"] = ingressed_run_ids + }) + return ch +} + + +/** + * Take a map of input arguments, find valid FASTQ inputs, and return a channel + * with elements of `[metamap, seqs.fastq.gz | null, path-to-fastcat-stats | null]`. + * The second item is `null` for sample sheet entries without a matching barcode + * directory. The last item is `null` if `fastcat` was not run (it is only run on + * directories containing more than one FASTQ file or when `stats: true`). + * + * @param arguments: map with arguments containing + * - "input": path to either: (i) input FASTQ file, (ii) top-level directory containing + * FASTQ files, (iii) directory containing sub-directories which contain FASTQ + * files + * - "sample": string to name single sample + * - "sample_sheet": path to CSV sample sheet + * - "analyse_unclassified": boolean whether to keep unclassified reads + * - "stats": boolean whether to write the `fastcat` stats + * - "fastcat_extra_args": string with extra arguments to pass to `fastcat` + * - "required_sample_types": list of required sample types in the sample sheet + * - "watch_path": boolean whether to use `watchPath` and run in streaming mode + * @return: channel of `[Map(alias, barcode, type, ...), Path|null, Path|null]`. + * The first element is a map with metadata, the second is the path to the + * `.fastq.gz` file with the (potentially concatenated) sequences and the third is + * the path to the directory with the `fastcat` statistics. The second element is + * `null` for sample sheet entries for which no corresponding barcode directory was + * found. The third element is `null` if `fastcat` was not run. + */ +def fastq_ingress(Map arguments) +{ + // check arguments + Map margs = parse_arguments(arguments, ["fastcat_extra_args": ""]) + + ArrayList fq_extensions = [".fastq", ".fastq.gz", ".fq", ".fq.gz"] + + // `watch_path` will be handled within `get_valid_inputs()` + def input = get_valid_inputs(margs, fq_extensions) + + def ch_result + if (margs.stats) { + // run fastcat regardless of input type + ch_result = fastcat(input.files.mix(input.dirs), margs["fastcat_extra_args"]) + } else { + // run `fastcat` only on directories and rename / compress single files + ch_result = fastcat(input.dirs, margs["fastcat_extra_args"]) + | mix( + input.files + | move_or_compress_fq_file + | map { meta, path -> [meta, path, null] } + ) + } + // add sample sheet entries without barcode dirs to the results channel and extract + // the run IDs into the metamaps before returning + ch_result = ch_result.mix(input.missing.map { [*it, null] }) + return add_run_IDs_to_meta(ch_result) +} + + +/** + * Take a map of input arguments, find valid (u)BAM inputs, and return a channel + * with elements of `[metamap, reads.bam | null, path-to-bamstats-results | null]`. + * The second item is `null` for sample sheet entries without a matching barcode + * directory or samples containing only uBAM files when `keep_unaligned` is `false`. + * The last item is `null` if `bamstats` was not run (it is only run when `stats: true`). + * + * @param arguments: map with arguments containing + * - "input": path to either: (i) input (u)BAM file, (ii) top-level directory + * containing (u)BAM files, (iii) directory containing sub-directories which contain + * (u)BAM files + * - "sample": string to name single sample + * - "sample_sheet": path to CSV sample sheet + * - "analyse_unclassified": boolean whether to keep unclassified reads + * - "stats": boolean whether to run `bamstats` + * - "keep_unaligned": boolean whether to include uBAM files + * - "required_sample_types": list of required sample types in the sample sheet + * - "watch_path": boolean whether to use `watchPath` and run in streaming mode + * @return: channel of `[Map(alias, barcode, type, ...), Path|null, Path|null]`. + * The first element is a map with metadata, the second is the path to the + * `.bam` file with the (potentially merged) sequences and the third is + * the path to the directory with the `bamstats` statistics. The second element is + * `null` for sample sheet entries for which no corresponding barcode directory was + * found and for samples with only uBAM files when `keep_unaligned: false`. The third + * element is `null` if `bamstats` was not run. + */ +def xam_ingress(Map arguments) +{ + // check arguments + Map margs = parse_arguments(arguments, ["keep_unaligned": false]) + + // we only accept BAM or uBAM for now (i.e. no SAM or CRAM) + ArrayList xam_extensions = [".bam", ".ubam"] + + def input = get_valid_inputs(margs, xam_extensions) + + ch_result = input.dirs + | map { meta, path -> [meta, get_target_files_in_dir(path, xam_extensions)] } + | mix(input.files) + + ch_is_unaligned = ch_result + | checkBamHeaders + | map { meta, is_unaligned_env, mixed_headers_env -> + // convert the env. variables from strings ('0' or '1') into bools + boolean is_unaligned = is_unaligned_env as int as boolean + boolean mixed_headers = mixed_headers_env as int as boolean + // throw an error if there was a sample with mixed headers + if (mixed_headers) { + error "Found mixed headers in (u)BAM files of sample '${meta.alias}'." + } + [meta, is_unaligned] + } + + ch_result = ch_result | join(ch_is_unaligned) + // add `is_unaligned` to the metamap (note the use of `+` to create a copy of `meta` + // to avoid modifying every item in the channel; + // https://github.com/nextflow-io/nextflow/issues/2660) + | map { meta, paths, is_unaligned -> [meta + [is_unaligned: is_unaligned], paths] } + | branch { meta, paths -> + // set `paths` to `null` for uBAM samples if unallowed (they will be added to + // the results channel in shape of `[meta, null]` at the end of the function + // (alongside the sample sheet entries without matching barcode dirs) + if (!margs["keep_unaligned"] && meta["is_unaligned"]){ + paths = null + } + // get the number of files (`paths` can be a list, a single path, or `null`) + int n_files = paths instanceof List ? paths.size() : (paths ? 1 : 0) + // Preparations finished; we can do the branching now. There will be 3 branches + // depending on the number of files per sample and whether the reads are already + // aligned: + // * no_op_needed: no need to do anything; just add to the final results channel + // downstream + // - no files + // - a single unaligned file + // * to_catsort: `samtools cat` into `samtools sort` + // - a single aligned file + // - more than one unaligned file + // - too many aligned files to safely and quickly merge (`samtools merge` opens + // all files at the same time and some machines might have low limits for + // open file descriptors) + // * to_merge: flatMap > sort > group > merge + // - between 1 and `N_OPEN_FILES_LIMIT` aligned files + no_op_needed: (n_files == 0) || (n_files == 1 && meta["is_unaligned"]) + to_catsort: \ + (n_files == 1) || (n_files > N_OPEN_FILES_LIMIT) || meta["is_unaligned"] + to_merge: true + } + + // deal with samples with few-enough files for `samtools merge` first + ch_merged = ch_result.to_merge + | flatMap { meta, paths -> paths.collect { [meta, it] } } + | sortBam + | groupTuple + | mergeBams + + // now handle samples with too many files for `samtools merge` + ch_catsorted = ch_result.to_catsort + | catSortBams + + ch_result = input.missing + | mix( + ch_result.no_op_needed, + ch_merged, + ch_catsorted, + ) + + // run `bamstats` if requested + if (margs["stats"]) { + // branch and run `bamstats` only on the non-`null` paths + ch_result = ch_result.branch { meta, path -> + has_reads: path + is_null: true + } + ch_bamstats = bamstats(ch_result.has_reads) + ch_result = add_run_IDs_to_meta(ch_bamstats) | mix(ch_result.is_null) + } else { + // add `null` instead of path to `bamstats` results dir + ch_result = ch_result | map { meta, bam -> [meta, bam, null] } + } + return ch_result +} + + +process checkBamHeaders { + label "ingress" + label "wf_common" + cpus 1 + input: tuple val(meta), path("input_dir/reads*.bam") + output: + // set the two env variables by `eval`-ing the output of the python script + // checking the XAM headers + tuple val(meta), env(IS_UNALIGNED), env(MIXED_HEADERS) + script: + """ + workflow-glue check_bam_headers_in_dir input_dir > env.vars + source env.vars + """ +} + + +process mergeBams { + label "ingress" + label "wf_common" + cpus 3 + input: tuple val(meta), path("input_bams/reads*.bam") + output: tuple val(meta), path("reads.bam") + shell: + """ + samtools merge -@ ${task.cpus - 1} \ + -b <(find input_bams -name 'reads*.bam') -o reads.bam + """ +} + + +process catSortBams { + label "ingress" + label "wf_common" + cpus 4 + input: tuple val(meta), path("input_bams/reads*.bam") + output: tuple val(meta), path("reads.bam") + script: + """ + samtools cat -b <(find input_bams -name 'reads*.bam') \ + | samtools sort - -@ ${task.cpus - 2} -o reads.bam + """ +} + + +process sortBam { + label "ingress" + label "wf_common" + cpus 3 + input: tuple val(meta), path("reads.bam") + output: tuple val(meta), path("reads.sorted.bam") + script: + """ + samtools sort -@ ${task.cpus - 1} reads.bam -o reads.sorted.bam + """ +} + + +process bamstats { + label "ingress" + label "wf_common" + cpus 3 + input: + tuple val(meta), path("reads.bam") + output: + tuple val(meta), path("reads.bam"), path("bamstats_results") + script: + def bamstats_threads = Math.max(1, task.cpus - 1) + """ + mkdir bamstats_results + bamstats reads.bam -s $meta.alias -u \ + -f bamstats_results/bamstats.flagstat.tsv -t $bamstats_threads \ + | bgzip > bamstats_results/bamstats.readstats.tsv.gz + + # extract the run IDs from the per-read stats + csvtk cut -tf runid bamstats_results/bamstats.readstats.tsv.gz \ + | csvtk del-header | sort | uniq > bamstats_results/run_ids + """ +} + + +/** + * Run `watchPath` on the input directory and return a channel of shape [metamap, + * path-to-target-file]. The meta data is taken from the sample sheet in case one was + * provided. Otherwise it only contains the `alias` (either `margs["sample"]` or the + * name of the parent directory of the file). + * + * @param input: path to a directory to watch + * @param margs: Map with parsed input arguments + * @param extensions: list of valid extensions for the target file type + * @return: Channel of [metamap, path-to-target-file] + */ +def watch_path(Path input, Map margs, ArrayList extensions) { + // we have two cases to consider: (i) files being generated in the top-level + // directory and (ii) files being generated in sub-directories. If we find files of + // both kinds, throw an error. + if (input.isFile()) { + error "Input ($input) must be a directory when using `watch_path`." + } + // get existing target files first (look for relevant files in the top-level dir and + // all sub-dirs) + def ch_existing_input = Channel.fromPath(input) + | concat(Channel.fromPath("$input/*", type: 'dir')) + | map { get_target_files_in_dir(it, extensions) } + | flatten + // now get channel with files found by `watchPath` + def ch_watched = Channel.watchPath("$input/**").until { it.name.startsWith('STOP') } + // only keep target files + | filter { is_target_file(it, extensions) } + // merge the channels + ch_watched = ch_existing_input | concat(ch_watched) + // check if input is as expected; start by throwing an error when finding files in + // top-level dir and sub-directories + String prev_input_type + ch_watched + | map { + String input_type = (it.parent == input) ? "top-level" : "sub-dir" + if (prev_input_type && (input_type != prev_input_type)) { + error "`watchPath` found input files in the top-level directory " + + "as well as in sub-directories." + } + // if file is in a sub-dir, make sure it's not a sub-sub-dir + if ((input_type == "sub-dir") && (it.parent.parent != input)) { + error "`watchPath` found an input file more than one level of " + + "sub-directories deep ('$it')." + } + // we also don't want files in the top-level dir when we got a sample sheet + if ((input_type == "top-level") && margs["sample_sheet"]) { + error "`watchPath` found input files in top-level directory even though " + + "a sample sheet was provided ('${margs["sample_sheet"]}')." + } + prev_input_type = input_type + } + if (margs.sample_sheet) { + // add metadata from sample sheet (we can't use join here since it does not work + // with repeated keys; we therefore need to transform the sample sheet data into + // a map with the barcodes as keys) + def ch_sample_sheet = get_sample_sheet(file(margs.sample_sheet), margs.required_sample_types) + | collect + | map { it.collectEntries { [(it["barcode"]): it] } } + // now we can use this channel to annotate all files with the corresponding info + // from the sample sheet + ch_watched = ch_watched + | combine(ch_sample_sheet) + | map { file_path, sample_sheet_map -> + String barcode = file_path.parent.name + Map sample_sheet_entry = sample_sheet_map[barcode] + // throw error if the barcode was not in the sample sheet + if (!sample_sheet_entry) { + error "Sub-directory $barcode was not found in the sample sheet." + } + [create_metamap(sample_sheet_entry), file_path] + } + } else { + ch_watched = ch_watched + | map { + // This file could be in the top-level dir or a sub-dir. In the first case + // check if a sample name was provided. In the second case, the alias is + // always the name of the sub-dir. + String alias + if (it.parent == input) { + // top-level dir + alias = margs["sample"] ?: it.parent.name + } else { + // sub-dir + alias = it.parent.name + } + [create_metamap([alias: alias]), it] + } + } + return ch_watched +} + + +process move_or_compress_fq_file { + label "ingress" + label "wf_common" + cpus 1 + input: + // don't stage `input` with a literal because we check the file extension + tuple val(meta), path(input) + output: + tuple val(meta), path("seqs.fastq.gz") + script: + String out = "seqs.fastq.gz" + if (input.name.endsWith('.gz')) { + // we need to take into account that the file could already be named + // "seqs.fastq.gz" in which case `mv` would fail + """ + [ "$input" == "$out" ] || mv "$input" $out + """ + } else { + """ + cat "$input" | bgzip -@ $task.cpus > $out + """ + } +} + + +process fastcat { + label "ingress" + label "wf_common" + cpus 3 + input: + tuple val(meta), path("input") + val extra_args + output: + tuple val(meta), path("seqs.fastq.gz"), path("fastcat_stats") + script: + String out = "seqs.fastq.gz" + String fastcat_stats_outdir = "fastcat_stats" + """ + mkdir $fastcat_stats_outdir + fastcat \ + -s ${meta["alias"]} \ + -r >(bgzip -c > $fastcat_stats_outdir/per-read-stats.tsv.gz) \ + -f $fastcat_stats_outdir/per-file-stats.tsv \ + $extra_args \ + input \ + | bgzip > $out + + # extract the run IDs from the per-read stats + csvtk cut -tf runid $fastcat_stats_outdir/per-read-stats.tsv.gz \ + | csvtk del-header | sort | uniq > $fastcat_stats_outdir/run_ids + """ +} + + +/** + * Parse input arguments for `fastq_ingress` or `xam_ingress`. + * + * @param arguments: map with input arguments (see the corresponding ingress function + * for details) + * @param extra_kwargs: map of extra keyword arguments and their defaults (this allows + * the argument-parsing to be tailored to a particular ingress function) + * @return: map of parsed arguments + */ +Map parse_arguments(Map arguments, Map extra_kwargs=[:]) { + ArrayList required_args = ["input"] + Map default_kwargs = [ + "sample": null, + "sample_sheet": null, + "analyse_unclassified": false, + "stats": true, + "required_sample_types": [], + "watch_path": false + ] + ArgumentParser parser = new ArgumentParser( + args: required_args, + kwargs: default_kwargs + extra_kwargs, + name: "fastq_ingress") + return parser.parse_args(arguments) +} + + +/** + * Find valid inputs based on the target extensions and return a branched channel with + * branches `missing`, `files` and `dir`, which are of the shape `[metamap, input_path | + * null]` (with `input_path` pointing to a target file or a directory containing target + * files, respectively). `missing` contains sample sheet entries for which no + * corresponding barcodes were found. + * Unless `watchPath` was requested, the function checks whether the input is a single + * target file, a top-level directory with target files, or a directory containing + * sub-directories (usually barcodes) with target files. + * + * @param margs: parsed arguments (see `fastq_ingress` and `xam_ingress` for details) + * @param extensions: list of valid extensions for the target file type + * @return: branched channel with branches `missing`, `dir`, and `files` +*/ +def get_valid_inputs(Map margs, ArrayList extensions){ + log.info "Searching input for $extensions files." + Path input + try { + input = file(margs.input, checkIfExists: true) + } catch (NoSuchFileException e) { + error "Input path $margs.input does not exist." + } + // declare resulting input channel + def ch_input + // run `watchPath` if requested + if (margs["watch_path"]) { + ch_input = watch_path(input, margs, extensions) + } else { + // check which of the allowed input types (single file, top-lvl dir, dir with + // sub-dirs) we got + InputType input_type = determine_input_type( + input, extensions, margs.analyse_unclassified + ) + // handle case of `input` being a single file + if (input_type == InputType.SingleFile) { + ch_input = Channel.of( + [create_metamap([alias: margs["sample"] ?: input.simpleName]), input]) + } else if (input_type == InputType.TopLevelDir) { + // input is a directory containing target files + ch_input = Channel.of( + [create_metamap([alias: margs["sample"] ?: input.baseName]), input]) + } else { + // input is a directory with sub-directories (e.g. barcodes) containing + // target files --> find these sub-directories + ArrayList sub_dirs_with_target_files = file( + input.resolve('*'), type: "dir" + ).findAll { get_target_files_in_dir(it, extensions) } + // remove directories called 'unclassified' unless otherwise specified + if (!margs.analyse_unclassified) { + sub_dirs_with_target_files = sub_dirs_with_target_files.findAll { + it.baseName != "unclassified" + } + } + // filter based on sample sheet in case one was provided + if (margs.sample_sheet) { + // get channel of entries in the sample sheet + def ch_sample_sheet = get_sample_sheet( + file(margs.sample_sheet), margs.required_sample_types + ) + // get the union of both channels (missing values will be replaced with + // `null`) + def ch_union = Channel.fromPath(sub_dirs_with_target_files).map { + [it.baseName, it] + }.join(ch_sample_sheet.map{[it.barcode, it]}, remainder: true) + // after joining the channels, there are three possible cases: + // (i) valid input path and sample sheet entry are both present + // (ii) there is a sample sheet entry but no corresponding input dir + // --> we'll emit `[metamap-from-sample-sheet-entry, null]` + // (iii) there is a valid path, but the sample sheet entry is missing + // --> drop this entry and print a warning to the log + ch_input = ch_union.map {barcode, path, sample_sheet_entry -> + if (sample_sheet_entry) { + [create_metamap(sample_sheet_entry), path] + } else { + log.warn "Input directory '$barcode' was found, but sample " + + "sheet '$margs.sample_sheet' has no such entry." + } + } + } else { + // no sample sheet --> simply emit the sub-dirs with the target files + ch_input = Channel.fromPath(sub_dirs_with_target_files).map { + [create_metamap([alias: it.baseName, barcode: it.baseName]), it] + } + } + } + } + // finally, we "unwrap" directories containing only a single file and then split the + // results channel into the three different output types (sample sheet entries + // without corresponding barcodes -- i.e. with `path == null`, single files, and + // dirs with multiple files) + def ch_branched_results = ch_input.map { meta, path -> + if (path && path.isDirectory()) { + List fq_files = get_target_files_in_dir(path, extensions) + if (fq_files.size() == 1) { + path = fq_files[0] + } + } + [meta, path] + } .branch { meta, path -> + missing: !path + files: path.isFile() + dirs: path.isDirectory() + } + return ch_branched_results +} + +/** + * Determine which of the allowed categories (single file, top-level directory, or + * directory with sub-directory) an input path belongs to. + * + * @param margs: parsed arguments (see `fastq_ingress()` or `xam_ingress()` for details) + * @param extensions: list of valid extensions for the target file type + * @return: input type represented as an instance of the `InputType` enum + */ +InputType determine_input_type( + Path input, ArrayList extensions, boolean analyse_unclassified +) { + if (input.isFile()) { + if (!is_target_file(input, extensions)) { + error "Input file is not of required file type." + } + return InputType.SingleFile + } else if (!input.isDirectory()){ + error "Input $input appears to be neither a file nor a directory." + } + // `input` is a directory --> we accept two cases: (i) a top-level directory with + // target files and no sub-directories or (ii) a directory with one layer of + // sub-directories containing target files. First, check if the directory contains + // target files and find potential sub-directories (and sub-dirs with target files; + // note that these lists can be empty) + boolean dir_has_target_files = get_target_files_in_dir(input, extensions) + ArrayList sub_dirs = file(input.resolve('*'), type: "dir") + ArrayList sub_dirs_with_target_files = sub_dirs.findAll { + get_target_files_in_dir(it, extensions) + }.findAll { it.baseName != "unclassified" || analyse_unclassified } + + // define string to re-use in error messages below + String target_files_str = \ + "target files (ending in ${extensions.collect{'\'' + it + '\''}.join(' / ')})" + + // check for target files in the top-level dir; if there are any, make sure there + // are no sub-directories containing target files + if (dir_has_target_files) { + if (sub_dirs_with_target_files) { + error "Input directory '$input' cannot contain $target_files_str " + + "and also sub-directories with such files." + } + return InputType.TopLevelDir + } + + // no target files in the top-level dir --> make sure there were sub-dirs with + // target files + if (!sub_dirs_with_target_files) { + error "Input directory '$input' must contain either $target_files_str " + + "or sub-directories containing such files (no more than one layer deep)." + } + // we don't allow sub-sub-directories with target files + if (sub_dirs.any { + ArrayList subsubdirs = file(it.resolve('*'), type: "dir") + subsubdirs.any { get_target_files_in_dir(it, extensions) } + }) { + error "Input directory '$input' cannot contain more " + + "than one level of sub-directories with $target_files_str." + } + return InputType.DirWithSubDirs +} + + +/** + * Create a map that contains at least these keys: `[alias, barcode, type]`. + * `alias` is required, `barcode` and `type` are filled with default values if + * missing. Additional entries are allowed. + * + * @param arguments: map with input parameters; must contain `alias` + * @return: map(alias, barcode, type, ...) + */ +Map create_metamap(Map arguments) { + ArgumentParser parser = new ArgumentParser( + args: ["alias"], + kwargs: [ + "barcode": null, + "type": "test_sample", + "run_ids": [], + ], + name: "create_metamap", + ) + def metamap = parser.parse_known_args(arguments) + metamap['alias'] = metamap['alias'].replaceAll(" ","_") + return metamap +} + + +/** + * Get the target files in the directory (non-recursive). + * + * @param dir: path to the target directory + * @param extensions: list of valid extensions for the target file type + * @return: list of found target files + */ +ArrayList get_target_files_in_dir(Path dir, ArrayList extensions) { + file(dir.resolve("*")).findAll { is_target_file(it, extensions) } +} + + +/** + * Check the sample sheet and return a channel with its rows if it is valid. + * + * @param sample_sheet: path to the sample sheet CSV + * @return: channel of maps (with values in sample sheet header as keys) + */ +def get_sample_sheet(Path sample_sheet, ArrayList required_sample_types) { + // If `validate_sample_sheet` does not return an error message, we can assume that + // the sample sheet is valid and parse it. However, because of Nextflow's + // asynchronous magic, we might emit values from `.splitCSV()` before the + // error-checking closure finishes. This is no big deal, but undesired nonetheless + // as the error message might be overwritten by the traces of new nextflow processes + // in STDOUT. Thus, we use the somewhat clunky construct with `concat` and `last` + // below. This lets the CSV channel only start to emit once the error checking is + // done. + ch_err = validate_sample_sheet(sample_sheet, required_sample_types).map { + // check if there was an error message + if (it) error "Invalid sample sheet: ${it}." + it + } + // concat the channel holding the path to the sample sheet to `ch_err` and call + // `.last()` to make sure that the error-checking closure above executes before + // emitting values from the CSV + return ch_err.concat(Channel.fromPath(sample_sheet)).last().splitCsv( + header: true, quote: '"' + ) +} + + +/** + * Python script for validating a sample sheet. The script will write messages + * to STDOUT if the sample sheet is invalid. In case there are no issues, no + * message is emitted. + * + * @param: path to sample sheet CSV + * @param: list of required sample types (optional) + * @return: string (optional) + */ +process validate_sample_sheet { + cpus 1 + label "ingress" + label "wf_common" + input: + path "sample_sheet.csv" + val required_sample_types + output: stdout + script: + String req_types_arg = required_sample_types ? "--required_sample_types "+required_sample_types.join(" ") : "" + """ + workflow-glue check_sample_sheet sample_sheet.csv $req_types_arg + """ +} diff --git a/main.nf b/main.nf index 56e3db5..64c9aaf 100644 --- a/main.nf +++ b/main.nf @@ -3,7 +3,7 @@ import groovy.json.JsonBuilder nextflow.enable.dsl = 2 -include { fastq_ingress } from './lib/fastqingress' +include { fastq_ingress } from './lib/ingress' include { nextcladeVersionChecker } from './lib/nextclade' process checkSampleSheet { @@ -162,7 +162,7 @@ process report { cpus 1 input: path "depth_stats/*" - path fastcat_stats + path "per_read_stats/?.gz" path "nextclade.json" path nextclade_errors path "pangolin.csv" @@ -201,7 +201,7 @@ process report { --max_len $params._max_len \ --report_depth $params.report_depth \ --depths depth_stats/* \ - --fastcat_stats $fastcat_stats \ + --fastcat_stats per_read_stats/* \ --bcftools_stats vcf_stats/* $genotype \ --versions versions \ --params params.json \ @@ -443,7 +443,7 @@ workflow pipeline { // report html_doc = report( artic.depth_stats.collect(), - samples | map { it[2].resolve("per-read-stats.tsv") } | collectFile(keepHeader: true), + samples.map { it[2].resolve("per-read-stats.tsv.gz") }.toList(), clades[0].collect(), clades[1].collect(), pangolin.out.report.collect(), @@ -483,9 +483,7 @@ WorkflowMain.initialise(workflow, params, log) workflow { - if (params.disable_ping == false) { - Pinguscript.ping_post(workflow, "start", "none", params.out_dir, params) - } + Pinguscript.ping_start(nextflow, workflow, params) c_green = params.monochrome_logs ? '' : "\033[0;32m"; c_reset = params.monochrome_logs ? '' : "\033[0m"; @@ -641,7 +639,7 @@ workflow { "input":params.fastq, "sample":params.sample, "sample_sheet":params.sample_sheet, - "fastcat_stats": true, + "stats": true, "analyse_unclassified":params.analyse_unclassified]) results = pipeline(samples, scheme_dir, params._scheme_name, params._scheme_version, reference, @@ -649,13 +647,9 @@ workflow { output(results) } - -if (params.disable_ping == false) { - workflow.onComplete { - Pinguscript.ping_post(workflow, "end", "none", params.out_dir, params) - } - - workflow.onError { - Pinguscript.ping_post(workflow, "error", "$workflow.errorMessage", params.out_dir, params) - } +workflow.onComplete { + Pinguscript.ping_complete(nextflow, workflow, params) +} +workflow.onError { + Pinguscript.ping_error(nextflow, workflow, params) } diff --git a/nextflow.config b/nextflow.config index 8094f1d..f1262d7 100644 --- a/nextflow.config +++ b/nextflow.config @@ -58,7 +58,7 @@ params { "--scheme_name 'SARS-CoV-2'", "--scheme_version 'Midnight-ONT/V3'" ] - common_sha = "sha0a6dc21fac17291f4acb2e0f67bcdec7bf63e6b7" + common_sha = "sha91452ece4f647f62b32dac3a614635a6f0d7f8b5" container_sha = 'shaa5485f2d1c9085c23b266273556a4ce01e5e0dd9' nextclade_sha = 'shae56aff3b5b498b8cb950993692f914033397f8da' pangolin_sha = 'shae304dd3bc308a519f26908eb9d5ffa7686131d17'