diff --git a/README.md b/README.md index 4ec2f54..86049f0 100755 --- a/README.md +++ b/README.md @@ -108,32 +108,90 @@ Note that you may need to remove any existing data in `results/` in order for sn #### Using locally ingested data (instead of downloading from S3) -Run the pipeline with `--config 'local_ingest=True'` to use the locally available files produced by the ingest pipeline (see `./ingest/README.md` for details on how to run). -Specifically, the files needed are `ingest/results/metadata.tsv` and `ingest/results/sequences_{SEGMENT}.fasta`. +If you have run the fauna ingest pipeline locally, you will have files such as +``` +ingest/fauna/results/ +├── metadata.tsv +├── sequences_ha.fasta +├── sequences_mp.fasta +├── sequences_na.fasta +├── sequences_np.fasta +├── sequences_ns.fasta +├── sequences_pa.fasta +├── sequences_pb1.fasta +└── sequences_pb2.fasta +``` + +Running with `--config 'inputs=[{"name": "local-ingest", "metadata": "ingest/fauna/results/metadata.tsv", "sequences": "ingest/fauna/results/sequences_{segment}.fasta"}]'` will switch the default S3 source to the local paths listed above. +Alternatively, you can use a config overlay YAML and supply it via `--configfile`: + +```yaml +inputs: + - name: local-ingest + metadata: ingest/fauna/results/metadata.tsv + sequences: ingest/fauna/results/sequences_{segment}.fasta +``` -### To modify this build to work with your own data +For the **h5n1-cattle-outbreak** the approach is similar, but you'll want to replace "fauna" with "joined-ncbi", "ncbi" or "andersen-lab", as appropriate. + +## To modify this build to work with your own data Although the simplest way to generate a custom build is via the quickstart build, you are welcome to clone this repo and use it as a starting point for running your own, local builds if you'd like. The [Nextstrain docs](https://docs.nextstrain.org/en/latest/index.html) are a fantastic resource for getting started with the Nextstrain pipeline, and include some [great tutorials](https://docs.nextstrain.org/en/latest/install.html) to get you started. This build is slightly more complicated than other builds, and has a few custom functions in it to accommodate all the features on [nextstrain.org](https://nextstrain.org/avian-flu), and makes use of wildcards for both subtypes and gene segments. If you'd like to adapt this full, non-simplified pipeline here to your own data (which you may want to do if you also want to annotate clades), you would need to make a few changes and additions: -#### 1. Data is stored on a private S3 bucket +### Use additional metadata and/or sequences -The phylogenetics pipeline starts by downloading data from a private S3 bucket. -If you don't have credentials for this bucket you can run the build using the example data provided in this repository. -Before running the build, copy the example sequences and metadata into the `data/` directory like so: +Using a config YAML provided via `--configfile`, or the same data via `--config` you can supply additional inputs. +For instance, this repo has a small set of example metadata + HA sequences. +We could add these to the default inputs via: -```bash -mkdir -p data/ -cp -r -v example_data/* data/ +```yaml +additional_inputs: + - name: example-data + metadata: example_data/gisaid/metadata.tsv + sequences: + ha: example_data/gisaid/sequences_ha.fasta +``` + +Which will merge `example_data/gisaid/metadata.tsv` with the default metadata, and add the sequences from `example_data/gisaid/sequences_ha.fasta` to the default HA sequences (all other segments will just use the default sequences). + +If you had sequences for each segment you could use a wildcard for the segment like so: + +```yaml +additional_inputs: + - name: example-data + metadata: example_data/gisaid/metadata.tsv + sequences: example_data/gisaid/sequences_{segment}.fasta ``` -Then run the build with the test target, a H5N1 HA "all time" tree: + +> NOTE: These added data will be subject to the same filtering rules as the starting data. + At a minimum, you'll want to ensure new sequences have metadata which defines their subtype and date, as filter steps will prune out those without valid values here. + +> NOTE: Metadata merging is via `augur merge` and we add a column per input source to indicate the origin of data. + For instance the above example would have a `input_example-data` column with values of `1` for metadata rows which were included in `example_data/gisaid/metadata.tsv` and `0` otherwise. + You can use this for additional filtering commands as needed. + + +### Replace the starting metadata and sequences + +Using the same approach as above but replacing `additional_inputs` with `inputs` will replace the default inputs. +See "Using locally ingested data (instead of downloading from S3)" (above) for an example of this. + + +### Run a single H5N1 HA all-time analysis + +There is a `test_target` rule which will produce a single H5N1 HA "all-time" tree. +You can combine this with the example data (see above) to run a single build using a small number of sequences: + ``` bash -snakemake test_target +snakemake --cores 2 \ + --configfile config/gisaid.yaml \ + --config 'inputs=[{"name": "example", "metadata": "example_data/gisaid/metadata.tsv", "sequences": "example_data/gisaid/sequences_{segment}.fasta"}]' \ + -pf test_target ``` -If you'd like to consistently run your own data, then you can place your fasta file in `data`. Alternatively, you can alter the `Snakefile` to remove references to S3 and add paths to your own files (see rules `download_sequences` and `download_metadata`). +### clade labeling -#### 2. clade labeling If you'd like to run clade labeling, you will need to install [LABEL](https://wonder.cdc.gov/amd/flu/label/) yourself. This pipeline assumes that LABEL is located in `avian-flu/flu-amd/`, and should work if you install it into the `avian-flu` directory. If you do not need to label clades, then you can delete `rule add_h5_clade`, the function `metadata_by_wildcards`. You will need to make sure that all references to `metadata` in the pipeline are referencing `metadata_subtype_segment`, not `metadata-with-clade_subtype_segment`, which is generated by `rule add_h5_clade` and adds a new column to the metadata file with clade information. diff --git a/Snakefile b/Snakefile index 0a9fa55..6ee0bae 100755 --- a/Snakefile +++ b/Snakefile @@ -15,19 +15,6 @@ SEGMENTS = ["pb2", "pb1", "pa", "ha","np", "na", "mp", "ns"] SAME_STRAINS = bool(config.get('same_strains_per_segment', False)) NEXTSTRAIN_PUBLIC_BUCKET = "s3://nextstrain-data/" -S3_SRC = config.get('s3_src', {}) -LOCAL_INGEST = config.get('local_ingest', None) - -def sanity_check_config(): - assert LOCAL_INGEST or S3_SRC, "The config must define either 's3_src' or 'local_ingest'" - # NOTE: we could relax the following exclusivity of S3_SRC and LOCAL_INGEST - # if we want to use `--config local_ingest=gisaid` overrides. - assert not (S3_SRC and LOCAL_INGEST), "The config defined both 'local_ingest' and 's3_src', which are mutually exclusive" - if S3_SRC: - assert isinstance(S3_SRC, dict) and all([k in S3_SRC for k in ("name", "sequences", "metadata")]), \ - "Config 's3_src' must be a dict with 'name', 'sequences' and 'metadata' keys" - -sanity_check_config() def collect_builds(): """ @@ -87,43 +74,151 @@ def subtypes_by_subtype_wildcard(wildcards): "`subtypes_by_subtype_wildcard` -- is there a typo in the subtype you are targetting?") return(db[wildcards.subtype]) -rule download_sequences: +class InvalidConfigError(Exception): + pass + +# ------------- helper functions to collect, merge & download input files ------------------- # + +def _parse_config_input(input): + """ + Parses information from an individual config-defined input, i.e. an element within `config.inputs` or `config.additional_inputs` + and returns information snakemake rules can use to obtain the underlying data. + + The structure of `input` is a dictionary with keys: + - name:string (required) + - metadata:string (optional) - a s3 URI or a local file path + - sequences:string|dict[string,string] (optional) - either a s3 URI or a local file path, in which case + it must include a '{segment}' wildcard substring, or a dict of segment → s3 URI or local file path, + in which case it must not include the wildcard substring. + + Returns a dictionary with optional keys: + - metadata:string - the relative path to the metadata file. If the original data was remote then this represents + the output of a rule which downloads the file + - metadata_location:string - the URI for the remote file if applicable else `None` + - sequences:function. Takes in wildcards and returns the relative path to the sequences FASTA for the provided + segment wildcard, or returns `None` if this input doesn't define sequences for the provided segment. + - sequences_location:function. Takes in wildcards and returns the URI for the remote file, or `None`, where applicable. + + Raises InvalidConfigError + """ + name = input['name'] + lambda_none = lambda w: None + + info = {'metadata': None, 'metadata_location': None, 'sequences': lambda_none, 'sequences_location': lambda_none} + + def _source(uri, *, s3, local): + if uri.startswith('s3://'): + return s3 + elif uri.lower().startswith(r'http[s]:\/\/'): + raise InvalidConfigError("Workflow cannot yet handle HTTP[S] inputs") + return local + + if location:=input.get('metadata', False): + info['metadata'] = _source(location, s3=f"data/{name}/metadata.tsv", local=location) + info['metadata_location'] = _source(location, s3=location, local=None) + + if location:=input.get('sequences', False): + if isinstance(location, dict): + info['sequences'] = lambda w: _source(location[w.segment], s3=f"data/{name}/sequences_{w.segment}.fasta", local=location[w.segment]) \ + if w.segment in location \ + else None + info['sequences_location'] = lambda w: _source(location[w.segment], s3=location[w.segment], local=None) \ + if w.segment in location \ + else None + elif isinstance(location, str): + info['sequences'] = _source(location, s3=lambda w: f"data/{name}/sequences_{w.segment}.fasta", local=lambda w: location.format(segment=w.segment)) + info['sequences_location'] = _source(location, s3=lambda w: location.format(segment=w.segment), local=lambda_none) + else: + raise InvalidConfigError(f"Config input for {name} specifies sequences in an unknown format; must be dict or string") + + return info + + +def _gather_inputs(): + all_inputs = [*config['inputs'], *config.get('additional_inputs', [])] + + if len(all_inputs)==0: + raise InvalidConfigError("Config must define at least one element in config.inputs or config.additional_inputs lists") + if not all([isinstance(i, dict) for i in all_inputs]): + raise InvalidConfigError("All of the elements in config.inputs and config.additional_inputs lists must be dictionaries" + "If you've used a command line '--config' double check your quoting.") + if len({i['name'] for i in all_inputs})!=len(all_inputs): + raise InvalidConfigError("Names of inputs (config.inputs and config.additional_inputs) must be unique") + if not all(['name' in i and ('sequences' in i or 'metadata' in i) for i in all_inputs]): + raise InvalidConfigError("Each input (config.inputs and config.additional_inputs) must have a 'name' and 'metadata' and/or 'sequences'") + + return {i['name']: _parse_config_input(i) for i in all_inputs} + +input_sources = _gather_inputs() + +def input_metadata(wildcards): + inputs = [info['metadata'] for info in input_sources.values() if info.get('metadata', None)] + return inputs[0] if len(inputs)==1 else "results/metadata_merged.tsv" + +def input_sequences(wildcards): + inputs = list(filter(None, [info['sequences'](wildcards) for info in input_sources.values() if info.get('sequences', None)])) + return inputs[0] if len(inputs)==1 else "results/sequences_merged_{segment}.fasta" + +rule download_s3_sequences: output: - sequences = f"data/{S3_SRC.get('name', None)}/sequences_{{segment}}.fasta", + sequences = "data/{input_name}/sequences_{segment}.fasta", params: - address=lambda w: S3_SRC.get('sequences', None).format(segment=w.segment), - no_sign_request=lambda w: "--no-sign-request" if S3_SRC.get('sequences', "").startswith(NEXTSTRAIN_PUBLIC_BUCKET) else "" + address = lambda w: input_sources[w.input_name]['sequences_location'](w), + no_sign_request=lambda w: "--no-sign-request" \ + if input_sources[w.input_name]['sequences_location'](w).startswith(NEXTSTRAIN_PUBLIC_BUCKET) \ + else "", shell: """ aws s3 cp {params.no_sign_request:q} {params.address:q} - | zstd -d > {output.sequences} """ -rule download_metadata: +rule download_s3_metadata: output: - metadata = f"data/{S3_SRC.get('name', None)}/metadata.tsv", + metadata = "data/{input_name}/metadata.tsv", params: - address=S3_SRC.get('metadata', None), - no_sign_request=lambda w: "--no-sign-request" if S3_SRC.get('metadata', "").startswith(NEXTSTRAIN_PUBLIC_BUCKET) else "" + address = lambda w: input_sources[w.input_name]['metadata_location'], + no_sign_request=lambda w: "--no-sign-request" \ + if input_sources[w.input_name]['metadata_location'].startswith(NEXTSTRAIN_PUBLIC_BUCKET) \ + else "", shell: """ aws s3 cp {params.no_sign_request:q} {params.address:q} - | zstd -d > {output.metadata} """ +rule merge_metadata: + """ + This rule should only be invoked if there are multiple defined metadata inputs + (config.inputs + config.additional_inputs) + """ + input: + **{name: info['metadata'] for name,info in input_sources.items() if info.get('metadata', None)} + params: + metadata = lambda w, input: list(map("=".join, input.items())) + output: + metadata = "results/metadata_merged.tsv" + shell: + r""" + augur merge \ + --metadata {params.metadata:q} \ + --source-columns 'input_{{NAME}}' \ + --output-metadata {output.metadata} + """ -def input_metadata(wildcards): - if S3_SRC: - return f"data/{S3_SRC['name']}/metadata.tsv", - elif LOCAL_INGEST: - return f"ingest/{LOCAL_INGEST}/results/metadata.tsv", - raise Exception() # already caught by `sanity_check_config` above, this is just being cautious - -def input_sequences(wildcards): - if S3_SRC: - return f"data/{S3_SRC['name']}/sequences_{wildcards.segment}.fasta", - elif LOCAL_INGEST: - return f"ingest/{LOCAL_INGEST}/results/sequences_{wildcards.segment}.fasta" - raise Exception() # already caught by `sanity_check_config` above, this is just being cautious +rule merge_sequences: + """ + This rule should only be invoked if there are multiple defined metadata inputs + (config.inputs + config.additional_inputs) for this particular segment + """ + input: + lambda w: list(filter(None, [info['sequences'](w) for info in input_sources.values()])) + output: + sequences = "results/sequences_merged_{segment}.fasta" + shell: + r""" + seqkit rmdup {input:q} > {output.sequences:q} + """ +# -------------------------------------------------------------------------------------------- # rule filter_sequences_by_subtype: input: diff --git a/config/gisaid.yaml b/config/gisaid.yaml index 284d36e..9765859 100644 --- a/config/gisaid.yaml +++ b/config/gisaid.yaml @@ -21,13 +21,11 @@ segments: - mp - ns -#### Parameters which define the input source #### -s3_src: - name: gisaid - metadata: s3://nextstrain-data-private/files/workflows/avian-flu/metadata.tsv.zst - sequences: s3://nextstrain-data-private/files/workflows/avian-flu/{segment}/sequences.fasta.zst -local_ingest: false -# P.S. To use local ingest files, comment out s3_src and change to local_ingest: fauna +# Input source(s) - See README.md for how to use local files instead and/or add additional inputs +inputs: + - name: gisaid + metadata: s3://nextstrain-data-private/files/workflows/avian-flu/metadata.tsv.zst + sequences: s3://nextstrain-data-private/files/workflows/avian-flu/{segment}/sequences.fasta.zst #### Parameters which control large overarching aspects of the build diff --git a/config/h5n1-cattle-outbreak.yaml b/config/h5n1-cattle-outbreak.yaml index bd0d396..13f6505 100644 --- a/config/h5n1-cattle-outbreak.yaml +++ b/config/h5n1-cattle-outbreak.yaml @@ -20,13 +20,12 @@ segments: - ns -#### Parameters which define the input source #### -s3_src: - name: ncbi - metadata: s3://nextstrain-data/files/workflows/avian-flu/h5n1/metadata.tsv.zst - sequences: s3://nextstrain-data/files/workflows/avian-flu/h5n1/{segment}/sequences.fasta.zst -local_ingest: false -# P.S. To use local ingest files, comment out s3_src and change to local_ingest: joined-ncbi (e.g.) +# Input source(s) - See README.md for how to use local files instead and/or add additional inputs +inputs: + - name: ncbi + metadata: s3://nextstrain-data/files/workflows/avian-flu/h5n1/metadata.tsv.zst + sequences: s3://nextstrain-data/files/workflows/avian-flu/h5n1/{segment}/sequences.fasta.zst + #### Parameters which control large overarching aspects of the build