Skip to content

Commit

Permalink
Multiple inputs / overriding inputs
Browse files Browse the repository at this point in the history
By having all phylogenetic workflows start from two lists of inputs
(`config.inputs`, `config.additional_inputs`) we enable a broad range of
uses with a consistent interface.

1. Using local ingest files is trivial (see added docs) and doesn't need
   a bunch of special-cased logic that is prone to falling out of date
   (as it had indeed done)
2. Adding extra / private data follows the similar pattern, with an
   additional config list being used so that we are explicit that the
   new data is additional and enforce an ordering which is needed for
   predictable `augur merge` behaviour. The canonical data can be
   removed / replaced via step (1) if needed.

I considered adding additional data after the subtype-filtering step,
which would avoid the need to add subtype in the metadata but requires
encoding this in the config overlay. I felt the chosen way was simpler
and more powerful.

When considering sequences the structure is more complex than metadata
because the influenza genome is segmented and we wish to allow users to
provide additional data for only some segments (see docstring for
`_parse_config_input`). For non-segmented pathogens the simpler
structure used here for metadata could also be used for sequences.

This workflow uses an old version of the CI workflow,
<https://github.com/nextstrain/.github/blob/v0/.github/workflows/pathogen-repo-ci.yaml#L233-L240>
which copies `example_data`. We could upgrade to the latest version
and use a config overlay to swap out the canonical inputs with the
example data.

Note that one of the side effects of the current implementation is that
merged inputs use the same filepath irrespective of the workflow. For
instance, both gisaid & h5n1-cattle-outbreak use the intermediate path
`results/metadata_merged.tsv`, which means it's not possible to maintain
runs of both those analysis concurrently if both were to use merged
inputs. Using separate analysis directories, e.g.
<#103> will help avoid this
shortcoming.
  • Loading branch information
jameshadfield committed Jan 7, 2025
1 parent a847af7 commit fde2126
Show file tree
Hide file tree
Showing 4 changed files with 212 additions and 62 deletions.
86 changes: 72 additions & 14 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -108,32 +108,90 @@ Note that you may need to remove any existing data in `results/` in order for sn

#### Using locally ingested data (instead of downloading from S3)

Run the pipeline with `--config 'local_ingest=True'` to use the locally available files produced by the ingest pipeline (see `./ingest/README.md` for details on how to run).
Specifically, the files needed are `ingest/results/metadata.tsv` and `ingest/results/sequences_{SEGMENT}.fasta`.
If you have run the fauna ingest pipeline locally, you will have files such as
```
ingest/fauna/results/
├── metadata.tsv
├── sequences_ha.fasta
├── sequences_mp.fasta
├── sequences_na.fasta
├── sequences_np.fasta
├── sequences_ns.fasta
├── sequences_pa.fasta
├── sequences_pb1.fasta
└── sequences_pb2.fasta
```

Running with `--config 'inputs=[{"name": "local-ingest", "metadata": "ingest/fauna/results/metadata.tsv", "sequences": "ingest/fauna/results/sequences_{segment}.fasta"}]'` will switch the default S3 source to the local paths listed above.
Alternatively, you can use a config overlay YAML and supply it via `--configfile`:

```yaml
inputs:
- name: local-ingest
metadata: ingest/fauna/results/metadata.tsv
sequences: ingest/fauna/results/sequences_{segment}.fasta
```
### To modify this build to work with your own data
For the **h5n1-cattle-outbreak** the approach is similar, but you'll want to replace "fauna" with "joined-ncbi", "ncbi" or "andersen-lab", as appropriate.
## To modify this build to work with your own data
Although the simplest way to generate a custom build is via the quickstart build, you are welcome to clone this repo and use it as a starting point for running your own, local builds if you'd like. The [Nextstrain docs](https://docs.nextstrain.org/en/latest/index.html) are a fantastic resource for getting started with the Nextstrain pipeline, and include some [great tutorials](https://docs.nextstrain.org/en/latest/install.html) to get you started. This build is slightly more complicated than other builds, and has a few custom functions in it to accommodate all the features on [nextstrain.org](https://nextstrain.org/avian-flu), and makes use of wildcards for both subtypes and gene segments. If you'd like to adapt this full, non-simplified pipeline here to your own data (which you may want to do if you also want to annotate clades), you would need to make a few changes and additions:
#### 1. Data is stored on a private S3 bucket
### Use additional metadata and/or sequences
The phylogenetics pipeline starts by downloading data from a private S3 bucket.
If you don't have credentials for this bucket you can run the build using the example data provided in this repository.
Before running the build, copy the example sequences and metadata into the `data/` directory like so:
Using a config YAML provided via `--configfile`, or the same data via `--config` you can supply additional inputs.
For instance, this repo has a small set of example metadata + HA sequences.
We could add these to the default inputs via:

```bash
mkdir -p data/
cp -r -v example_data/* data/
```yaml
additional_inputs:
- name: example-data
metadata: example_data/gisaid/metadata.tsv
sequences:
ha: example_data/gisaid/sequences_ha.fasta
```

Which will merge `example_data/gisaid/metadata.tsv` with the default metadata, and add the sequences from `example_data/gisaid/sequences_ha.fasta` to the default HA sequences (all other segments will just use the default sequences).

If you had sequences for each segment you could use a wildcard for the segment like so:

```yaml
additional_inputs:
- name: example-data
metadata: example_data/gisaid/metadata.tsv
sequences: example_data/gisaid/sequences_{segment}.fasta
```

Then run the build with the test target, a H5N1 HA "all time" tree:

> NOTE: These added data will be subject to the same filtering rules as the starting data.
At a minimum, you'll want to ensure new sequences have metadata which defines their subtype and date, as filter steps will prune out those without valid values here.

> NOTE: Metadata merging is via `augur merge` and we add a column per input source to indicate the origin of data.
For instance the above example would have a `input_example-data` column with values of `1` for metadata rows which were included in `example_data/gisaid/metadata.tsv` and `0` otherwise.
You can use this for additional filtering commands as needed.


### Replace the starting metadata and sequences

Using the same approach as above but replacing `additional_inputs` with `inputs` will replace the default inputs.
See "Using locally ingested data (instead of downloading from S3)" (above) for an example of this.


### Run a single H5N1 HA all-time analysis

There is a `test_target` rule which will produce a single H5N1 HA "all-time" tree.
You can combine this with the example data (see above) to run a single build using a small number of sequences:


``` bash
snakemake test_target
snakemake --cores 2 \
--configfile config/gisaid.yaml \
--config 'inputs=[{"name": "example", "metadata": "example_data/gisaid/metadata.tsv", "sequences": "example_data/gisaid/sequences_{segment}.fasta"}]' \
-pf test_target
```

If you'd like to consistently run your own data, then you can place your fasta file in `data`. Alternatively, you can alter the `Snakefile` to remove references to S3 and add paths to your own files (see rules `download_sequences` and `download_metadata`).
### clade labeling

#### 2. clade labeling
If you'd like to run clade labeling, you will need to install [LABEL](https://wonder.cdc.gov/amd/flu/label/) yourself. This pipeline assumes that LABEL is located in `avian-flu/flu-amd/`, and should work if you install it into the `avian-flu` directory. If you do not need to label clades, then you can delete `rule add_h5_clade`, the function `metadata_by_wildcards`. You will need to make sure that all references to `metadata` in the pipeline are referencing `metadata_subtype_segment`, not `metadata-with-clade_subtype_segment`, which is generated by `rule add_h5_clade` and adds a new column to the metadata file with clade information.
163 changes: 129 additions & 34 deletions Snakefile
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,6 @@ SEGMENTS = ["pb2", "pb1", "pa", "ha","np", "na", "mp", "ns"]
SAME_STRAINS = bool(config.get('same_strains_per_segment', False))

NEXTSTRAIN_PUBLIC_BUCKET = "s3://nextstrain-data/"
S3_SRC = config.get('s3_src', {})
LOCAL_INGEST = config.get('local_ingest', None)

def sanity_check_config():
assert LOCAL_INGEST or S3_SRC, "The config must define either 's3_src' or 'local_ingest'"
# NOTE: we could relax the following exclusivity of S3_SRC and LOCAL_INGEST
# if we want to use `--config local_ingest=gisaid` overrides.
assert not (S3_SRC and LOCAL_INGEST), "The config defined both 'local_ingest' and 's3_src', which are mutually exclusive"
if S3_SRC:
assert isinstance(S3_SRC, dict) and all([k in S3_SRC for k in ("name", "sequences", "metadata")]), \
"Config 's3_src' must be a dict with 'name', 'sequences' and 'metadata' keys"

sanity_check_config()

def collect_builds():
"""
Expand Down Expand Up @@ -87,43 +74,151 @@ def subtypes_by_subtype_wildcard(wildcards):
"`subtypes_by_subtype_wildcard` -- is there a typo in the subtype you are targetting?")
return(db[wildcards.subtype])

rule download_sequences:
class InvalidConfigError(Exception):
pass

# ------------- helper functions to collect, merge & download input files ------------------- #

def _parse_config_input(input):
"""
Parses information from an individual config-defined input, i.e. an element within `config.inputs` or `config.additional_inputs`
and returns information snakemake rules can use to obtain the underlying data.
The structure of `input` is a dictionary with keys:
- name:string (required)
- metadata:string (optional) - a s3 URI or a local file path
- sequences:string|dict[string,string] (optional) - either a s3 URI or a local file path, in which case
it must include a '{segment}' wildcard substring, or a dict of segment → s3 URI or local file path,
in which case it must not include the wildcard substring.
Returns a dictionary with optional keys:
- metadata:string - the relative path to the metadata file. If the original data was remote then this represents
the output of a rule which downloads the file
- metadata_location:string - the URI for the remote file if applicable else `None`
- sequences:function. Takes in wildcards and returns the relative path to the sequences FASTA for the provided
segment wildcard, or returns `None` if this input doesn't define sequences for the provided segment.
- sequences_location:function. Takes in wildcards and returns the URI for the remote file, or `None`, where applicable.
Raises InvalidConfigError
"""
name = input['name']
lambda_none = lambda w: None

info = {'metadata': None, 'metadata_location': None, 'sequences': lambda_none, 'sequences_location': lambda_none}

def _source(uri, *, s3, local):
if uri.startswith('s3://'):
return s3
elif uri.lower().startswith('http://') or uri.lower().startswith('https://'):
raise InvalidConfigError("Workflow cannot yet handle HTTP[S] inputs")
return local

if location:=input.get('metadata', False):
info['metadata'] = _source(location, s3=f"data/{name}/metadata.tsv", local=location)
info['metadata_location'] = _source(location, s3=location, local=None)

if location:=input.get('sequences', False):
if isinstance(location, dict):
info['sequences'] = lambda w: _source(location[w.segment], s3=f"data/{name}/sequences_{w.segment}.fasta", local=location[w.segment]) \
if w.segment in location \
else None
info['sequences_location'] = lambda w: _source(location[w.segment], s3=location[w.segment], local=None) \
if w.segment in location \
else None
elif isinstance(location, str):
info['sequences'] = _source(location, s3=lambda w: f"data/{name}/sequences_{w.segment}.fasta", local=lambda w: location.format(segment=w.segment))
info['sequences_location'] = _source(location, s3=lambda w: location.format(segment=w.segment), local=lambda_none)
else:
raise InvalidConfigError(f"Config input for {name} specifies sequences in an unknown format; must be dict or string")

return info


def _gather_inputs():
all_inputs = [*config['inputs'], *config.get('additional_inputs', [])]

if len(all_inputs)==0:
raise InvalidConfigError("Config must define at least one element in config.inputs or config.additional_inputs lists")
if not all([isinstance(i, dict) for i in all_inputs]):
raise InvalidConfigError("All of the elements in config.inputs and config.additional_inputs lists must be dictionaries"
"If you've used a command line '--config' double check your quoting.")
if len({i['name'] for i in all_inputs})!=len(all_inputs):
raise InvalidConfigError("Names of inputs (config.inputs and config.additional_inputs) must be unique")
if not all(['name' in i and ('sequences' in i or 'metadata' in i) for i in all_inputs]):
raise InvalidConfigError("Each input (config.inputs and config.additional_inputs) must have a 'name' and 'metadata' and/or 'sequences'")

return {i['name']: _parse_config_input(i) for i in all_inputs}

input_sources = _gather_inputs()

def input_metadata(wildcards):
inputs = [info['metadata'] for info in input_sources.values() if info.get('metadata', None)]
return inputs[0] if len(inputs)==1 else "results/metadata_merged.tsv"

def input_sequences(wildcards):
inputs = list(filter(None, [info['sequences'](wildcards) for info in input_sources.values() if info.get('sequences', None)]))
return inputs[0] if len(inputs)==1 else "results/sequences_merged_{segment}.fasta"

rule download_s3_sequences:
output:
sequences = f"data/{S3_SRC.get('name', None)}/sequences_{{segment}}.fasta",
sequences = "data/{input_name}/sequences_{segment}.fasta",
params:
address=lambda w: S3_SRC.get('sequences', None).format(segment=w.segment),
no_sign_request=lambda w: "--no-sign-request" if S3_SRC.get('sequences', "").startswith(NEXTSTRAIN_PUBLIC_BUCKET) else ""
address = lambda w: input_sources[w.input_name]['sequences_location'](w),
no_sign_request=lambda w: "--no-sign-request" \
if input_sources[w.input_name]['sequences_location'](w).startswith(NEXTSTRAIN_PUBLIC_BUCKET) \
else "",
shell:
"""
aws s3 cp {params.no_sign_request:q} {params.address:q} - | zstd -d > {output.sequences}
"""

rule download_metadata:
rule download_s3_metadata:
output:
metadata = f"data/{S3_SRC.get('name', None)}/metadata.tsv",
metadata = "data/{input_name}/metadata.tsv",
params:
address=S3_SRC.get('metadata', None),
no_sign_request=lambda w: "--no-sign-request" if S3_SRC.get('metadata', "").startswith(NEXTSTRAIN_PUBLIC_BUCKET) else ""
address = lambda w: input_sources[w.input_name]['metadata_location'],
no_sign_request=lambda w: "--no-sign-request" \
if input_sources[w.input_name]['metadata_location'].startswith(NEXTSTRAIN_PUBLIC_BUCKET) \
else "",
shell:
"""
aws s3 cp {params.no_sign_request:q} {params.address:q} - | zstd -d > {output.metadata}
"""

rule merge_metadata:
"""
This rule should only be invoked if there are multiple defined metadata inputs
(config.inputs + config.additional_inputs)
"""
input:
**{name: info['metadata'] for name,info in input_sources.items() if info.get('metadata', None)}
params:
metadata = lambda w, input: list(map("=".join, input.items()))
output:
metadata = "results/metadata_merged.tsv"
shell:
r"""
augur merge \
--metadata {params.metadata:q} \
--source-columns 'input_{{NAME}}' \
--output-metadata {output.metadata}
"""

def input_metadata(wildcards):
if S3_SRC:
return f"data/{S3_SRC['name']}/metadata.tsv",
elif LOCAL_INGEST:
return f"ingest/{LOCAL_INGEST}/results/metadata.tsv",
raise Exception() # already caught by `sanity_check_config` above, this is just being cautious

def input_sequences(wildcards):
if S3_SRC:
return f"data/{S3_SRC['name']}/sequences_{wildcards.segment}.fasta",
elif LOCAL_INGEST:
return f"ingest/{LOCAL_INGEST}/results/sequences_{wildcards.segment}.fasta"
raise Exception() # already caught by `sanity_check_config` above, this is just being cautious
rule merge_sequences:
"""
This rule should only be invoked if there are multiple defined metadata inputs
(config.inputs + config.additional_inputs) for this particular segment
"""
input:
lambda w: list(filter(None, [info['sequences'](w) for info in input_sources.values()]))
output:
sequences = "results/sequences_merged_{segment}.fasta"
shell:
r"""
seqkit rmdup {input:q} > {output.sequences:q}
"""

# -------------------------------------------------------------------------------------------- #

rule filter_sequences_by_subtype:
input:
Expand Down
12 changes: 5 additions & 7 deletions config/gisaid.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,11 @@ segments:
- mp
- ns

#### Parameters which define the input source ####
s3_src:
name: gisaid
metadata: s3://nextstrain-data-private/files/workflows/avian-flu/metadata.tsv.zst
sequences: s3://nextstrain-data-private/files/workflows/avian-flu/{segment}/sequences.fasta.zst
local_ingest: false
# P.S. To use local ingest files, comment out s3_src and change to local_ingest: fauna
# Input source(s) - See README.md for how to use local files instead and/or add additional inputs
inputs:
- name: gisaid
metadata: s3://nextstrain-data-private/files/workflows/avian-flu/metadata.tsv.zst
sequences: s3://nextstrain-data-private/files/workflows/avian-flu/{segment}/sequences.fasta.zst


#### Parameters which control large overarching aspects of the build
Expand Down
13 changes: 6 additions & 7 deletions config/h5n1-cattle-outbreak.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,12 @@ segments:
- ns


#### Parameters which define the input source ####
s3_src:
name: ncbi
metadata: s3://nextstrain-data/files/workflows/avian-flu/h5n1/metadata.tsv.zst
sequences: s3://nextstrain-data/files/workflows/avian-flu/h5n1/{segment}/sequences.fasta.zst
local_ingest: false
# P.S. To use local ingest files, comment out s3_src and change to local_ingest: joined-ncbi (e.g.)
# Input source(s) - See README.md for how to use local files instead and/or add additional inputs
inputs:
- name: ncbi
metadata: s3://nextstrain-data/files/workflows/avian-flu/h5n1/metadata.tsv.zst
sequences: s3://nextstrain-data/files/workflows/avian-flu/h5n1/{segment}/sequences.fasta.zst



#### Parameters which control large overarching aspects of the build
Expand Down

0 comments on commit fde2126

Please sign in to comment.