From c6d92ed2c542af5f92aabbbfb12e276bb51f88ad Mon Sep 17 00:00:00 2001 From: james hadfield Date: Mon, 2 Dec 2024 17:10:56 +1300 Subject: [PATCH] WIP Allow different input sources By having all phylogenetic workflows start from two lists of inputs (`config.inputs`, `config.additional_inputs`) we enable a broad range of uses with a consistent interface. 1. Using local ingest files is trivial (see added docs) and doesn't need a bunch of special-cased logic that is prone to falling out of date (as it had indeed done) 2. Adding extra / private data follows the similar pattern, with an additional config list being used so that we are explicit that the new data is additional and enforce an ordering which is needed for predictable `augur merge` behaviour. The canonical data can be removed / replaced via step (1) if needed. I considered adding additional data after the subtype-filtering step, which would avoid the need to add subtype in the metadata but requires encoding this in the config overlay. I felt the chosen way was simpler and more powerful. Note that this workflow uses an old version of the CI workflow, which copies `example_data`. We could upgrade to the latest version and use a config overlay to swap out the canonical inputs with the example data. --- README.md | 57 +++++++++++++- Snakefile | 125 +++++++++++++++++++++++-------- gisaid/config.yaml | 12 ++- h5n1-cattle-outbreak/config.yaml | 12 ++- 4 files changed, 158 insertions(+), 48 deletions(-) diff --git a/README.md b/README.md index be7b1d2..31cf8e8 100755 --- a/README.md +++ b/README.md @@ -12,6 +12,7 @@ The Snakemake pipeline is parameterised by two config files, one for the A/H5N1, * [Running H5N1 Cattle Outbreak (2024) builds](#running-h5n1-cattle-outbreak-2024-builds) * [Creating a custom build via config overlays](#creating-a-custom-build-via-config-overlays) * [Running builds in a separate working directory](#running-builds-in-a-separate-working-directory) +* [Adding private data](#adding-private-data) * [Cleavage Site Annotations](#cleavage-side-annotations) * [H5 Clade Labeling](#h5-clade-labeling) * [Other build customisations](#other-build-customisations) @@ -181,6 +182,37 @@ Depending on how you run builds this can be very liberating; for instance if you > You don't have to name it `config.yaml`, but if you use a different filename you'll have to specify it via `--configfile `. +## Adding private data + +Private metadata and/or sequences can be supplied by defining `additional_inputs` in a config overlay YAML. + +```yaml +additional_inputs: + - name: secret + metadata: secret.tsv + sequences: + ha: secret_ha.fasta +``` + +The filenames here can be S3 URIs (ensure you have credentials set in your environment) or local files. +In this case local files should be specified relative to the analysis directory (typically where you run the command from). + +If you have data for all segments you can use a slightly different and more concise syntax: +```yaml +additional_inputs: + - name: secret + metadata: secret.tsv + sequencs: secret_{segment}.fasta +``` + +> These added data will be subject to the same filtering rules as the starting data. + At a minimum, you'll want to ensure new sequences have metadata which defines their subtype and date, as filter steps will prune out those without valid values here. + +> Metadata merging is via `augur merge` and we add one-hot columns for each input source with the column name `input_{NAME}`, for instance the above example would have a `input_secret` column with values of `1` for metadata rows which were included in `secret.tsv` and `0` otherwise. + You can use this for additional filtering commands as needed. + +By default each workflow config defines a single metadata input and one FASTA per segment. + ## Cleavage Site Annotations @@ -224,8 +256,29 @@ Note that you may need to remove any existing data in `results/` in order for sn ### Using locally ingested data (instead of downloading from S3) -Run the pipeline with `--config 'local_ingest=True'` to use the locally available files produced by the ingest pipeline (see `./ingest/README.md` for details on how to run). -Specifically, the files needed are `ingest/results/metadata.tsv` and `ingest/results/sequences_{SEGMENT}.fasta`. +Each workflow defines an input via `config['inputs']`, e.g. the GISAID build uses: +```yaml +inputs: + - name: gisaid + metadata: s3://nextstrain-data-private/files/workflows/avian-flu/metadata.tsv.zst + sequences: s3://nextstrain-data-private/files/workflows/avian-flu/{segment}/sequences.fasta.zst +``` + +You can use an approach analogous to the [addition of private data](#adding-private-data) above to replace this array in your config overlay to point to local files instead. +If you have run the default ingest pipeline a config overlay of + +```yaml +inputs: + - name: ingest + metadata: ingest/fauna/results/metadata.tsv + sequences: ingest/fauna/results/sequences_{segment}.fasta +``` + +Will result in the default inputs being replaced by paths to these local ingest files. +The search order for local files is: + 1. Relative to the analysis directory + 2. Relative to the entry snakefile + 3. Relative to the `avian-flu` directory ### To modify this build to work with your own data diff --git a/Snakefile b/Snakefile index 3f7accc..b6fe123 100755 --- a/Snakefile +++ b/Snakefile @@ -148,14 +148,6 @@ def sanity_check_config(): print("-"*80, file=sys.stderr) raise InvalidConfigError("No config") - assert LOCAL_INGEST or S3_SRC, "The config must define either 's3_src' or 'local_ingest'" - # NOTE: we could relax the following exclusivity of S3_SRC and LOCAL_INGEST - # if we want to use `--config local_ingest=gisaid` overrides. - assert not (S3_SRC and LOCAL_INGEST), "The config defined both 'local_ingest' and 's3_src', which are mutually exclusive" - if S3_SRC: - assert isinstance(S3_SRC, dict) and all([k in S3_SRC for k in ("name", "sequences", "metadata")]), \ - "Config 's3_src' must be a dict with 'name', 'sequences' and 'metadata' keys" - sanity_check_config() def as_list(x): @@ -233,48 +225,115 @@ rule files: files = rules.files.params -rule download_sequences: +rule download_s3_sequences: output: - sequences = f"data/{S3_SRC.get('name', None)}/sequences_{{segment}}.fasta", + metadata = "data/{input_name}/sequences_{segment}.fasta", params: - address=lambda w: S3_SRC.get('sequences', None).format(segment=w.segment), - no_sign_request=lambda w: "--no-sign-request" if S3_SRC.get('sequences', "").startswith(NEXTSTRAIN_PUBLIC_BUCKET) else "" + address=lambda w: input_by_name(w.input_name, segment=w.segment).format(segment=w.segment), + no_sign_request=lambda w: "--no-sign-request" if input_by_name(w.input_name, segment=w.segment).startswith(NEXTSTRAIN_PUBLIC_BUCKET) else "" shell: """ - aws s3 cp {params.no_sign_request:q} {params.address:q} - | zstd -d > {output.sequences} + aws s3 cp {params.no_sign_request:q} {params.address:q} - | zstd -d > {output.metadata} """ -rule download_metadata: +rule download_s3_metadata: output: - metadata = f"data/{S3_SRC.get('name', None)}/metadata.tsv", + metadata = "data/{input_name}/metadata.tsv", params: - address=S3_SRC.get('metadata', None), - no_sign_request=lambda w: "--no-sign-request" if S3_SRC.get('metadata', "").startswith(NEXTSTRAIN_PUBLIC_BUCKET) else "" + address=lambda w: input_by_name(w.input_name, metadata=True), + no_sign_request=lambda w: "--no-sign-request" if input_by_name(w.input_name, metadata=True).startswith(NEXTSTRAIN_PUBLIC_BUCKET) else "" shell: """ aws s3 cp {params.no_sign_request:q} {params.address:q} - | zstd -d > {output.metadata} """ +def _segment_input(info, segment): + address = info.get('sequences', None) + if address is None: + return address + elif isinstance(address, str): + return address + elif isinstance(address, dict): + return address.get(segment, None) + raise InvalidConfigError(f"Invalid structure for one or more provided (additional) input sequences") -def input_metadata(wildcards): - if S3_SRC: - return f"data/{S3_SRC['name']}/metadata.tsv", - elif LOCAL_INGEST: - return f"ingest/{LOCAL_INGEST}/results/metadata.tsv", - raise Exception() # already caught by `sanity_check_config` above, this is just being cautious +def input_by_name(name, metadata=False, segment=False): + assert (metadata and not segment) or (segment and not metadata), "Workflow bug" + info = next((c for c in [*config['inputs'], *config.get('additional_inputs', [])] if c['name']==name)) + return info.get('metadata', None) if metadata else _segment_input(info, segment) -def input_sequences(wildcards): - if S3_SRC: - return f"data/{S3_SRC['name']}/sequences_{wildcards.segment}.fasta", - elif LOCAL_INGEST: - return f"ingest/{LOCAL_INGEST}/results/sequences_{wildcards.segment}.fasta" - raise Exception() # already caught by `sanity_check_config` above, this is just being cautious +def collect_inputs(metadata=None, segment=None, augur_merge=False): + """ + This function is pure - subsequent calls will return the same results. + """ + assert (metadata and not segment) or (segment and not metadata), "Workflow bug" + + inputs = [] + for source in [*config['inputs'], *config.get('additional_inputs', [])]: + name = source['name'] + + address = source.get('metadata', None) if metadata else _segment_input(source, segment) + if address is None: + continue # inputs can define (e.g.) only metadata, or only sequences for some segments etc + + # addresses may be a remote filepath or a local file + if address.startswith('s3://'): + download_path = f"data/{source['name']}/metadata.tsv" \ + if metadata \ + else f"data/{source['name']}/sequences_{segment}.fasta" + inputs.append((name, download_path)) + continue + elif address.startswith(r'http[s]:\/\/'): + raise InvalidConfigError("Workflow cannot yet handle HTTP[S] inputs") + inputs.append((name, resolve_config_path(address, {'segment':segment}))) + + if not inputs: + raise InvalidConfigError("No inputs provided with defined metadata") + + if augur_merge: + return " ".join([f"{x[0]}={x[1]}" for x in inputs]) + return [x[1] for x in inputs] + +rule merge_metadata: + """ + This rule should only be invoked if there are multiple defined metadata inputs + (config.inputs + config.additional_inputs) + """ + input: + metadata = collect_inputs(metadata=True) + params: + metadata = collect_inputs(metadata=True, augur_merge=True) + output: + metadata = "results/metadata_merged.tsv" + shell: + r""" + augur merge \ + --metadata {params.metadata} \ + --source-columns 'input_{{NAME}}' \ + --output-metadata {output.metadata} + """ +rule merge_sequences: + """ + Placeholder rule while we want for `augur merge` to support sequences + """ + input: + metadata = lambda w: collect_inputs(segment=w.segment) + output: + metadata = "results/sequences_merged_{segment}.fasta" + shell: + r""" + cat {input.metadata} > {output.metadata} + """ rule filter_sequences_by_subtype: input: - sequences = input_sequences, - metadata = input_metadata, + sequences = lambda w: collect_inputs(segment=w.segment)[0] + if len(collect_inputs(segment=w.segment))==1 + else f"results/sequences_merged_{w.segment}.fasta", + metadata = collect_inputs(metadata=True)[0] + if len(collect_inputs(metadata=True))==1 + else "results/metadata_merged.tsv" output: sequences = "results/{subtype}/{segment}/sequences.fasta", params: @@ -290,7 +349,9 @@ rule filter_sequences_by_subtype: rule filter_metadata_by_subtype: input: - metadata = input_metadata, + metadata = collect_inputs(metadata=True)[0] + if len(collect_inputs(metadata=True))==1 + else "results/metadata_merged.tsv" output: metadata = "results/{subtype}/metadata.tsv", params: diff --git a/gisaid/config.yaml b/gisaid/config.yaml index 779d2df..cb7f0c6 100644 --- a/gisaid/config.yaml +++ b/gisaid/config.yaml @@ -29,13 +29,11 @@ builds: target_patterns: - "auspice/avian-flu_{subtype}_{segment}_{time}.json" -#### Parameters which define the input source #### -s3_src: - name: gisaid - metadata: s3://nextstrain-data-private/files/workflows/avian-flu/metadata.tsv.zst - sequences: s3://nextstrain-data-private/files/workflows/avian-flu/{segment}/sequences.fasta.zst -local_ingest: false -# P.S. To use local ingest files, comment out s3_src and change to local_ingest: fauna +# Input source(s) +inputs: + - name: gisaid + metadata: s3://nextstrain-data-private/files/workflows/avian-flu/metadata.tsv.zst + sequences: s3://nextstrain-data-private/files/workflows/avian-flu/{segment}/sequences.fasta.zst # For subtypes defined as build wildcards (e.g. "h5n1", "h5nx"), list out the subtype values # that we'll use to filter the starting metadata's 'subtype' column diff --git a/h5n1-cattle-outbreak/config.yaml b/h5n1-cattle-outbreak/config.yaml index d1d97f0..255c272 100644 --- a/h5n1-cattle-outbreak/config.yaml +++ b/h5n1-cattle-outbreak/config.yaml @@ -24,13 +24,11 @@ builds: target_patterns: - "auspice/avian-flu_{subtype}_{segment}.json" -#### Parameters which define the input source #### -s3_src: - name: ncbi - metadata: s3://nextstrain-data/files/workflows/avian-flu/h5n1/metadata.tsv.zst - sequences: s3://nextstrain-data/files/workflows/avian-flu/h5n1/{segment}/sequences.fasta.zst -local_ingest: false -# P.S. To use local ingest files, comment out s3_src and change to local_ingest: joined-ncbi (e.g.) +# Input source(s) +inputs: + - name: ncbi + metadata: s3://nextstrain-data/files/workflows/avian-flu/h5n1/metadata.tsv.zst + sequences: s3://nextstrain-data/files/workflows/avian-flu/h5n1/{segment}/sequences.fasta.zst # For subtypes defined as build wildcards (i.e. "h5n1-cattle-outbreak"), list out the subtype values # that we'll use to filter the starting metadata's 'subtype' column