diff --git a/Cargo.toml b/Cargo.toml index 20967af..e730861 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "oarfish" -version = "0.5.1" +version = "0.6.0" edition = "2021" authors = [ "Zahra Zare Jousheghani ", @@ -28,23 +28,20 @@ categories = ["command-line-utilities", "science"] seqcol_rs = { git = "https://github.com/COMBINE-lab/seqcol_rs", branch = "dev", version = "0.1.0" } anyhow = "1.0.86" bio-types = { version = "1.0.4", features = ["serde"] } -clap = { version = "4.5.13", features = ["derive"] } -coitrees = "0.4.0" +clap = { version = "4.5.16", features = ["derive"] } noodles-bam = "0.66.0" -noodles-gtf = "0.30.0" noodles-sam = "0.63.0" num-format = "0.4.4" tabled = "0.16.0" tracing = "0.1.40" tracing-subscriber = { version = "0.3.18", features = ["env-filter"] } -typed-builder = "0.19.1" +typed-builder = "0.20.0" rayon = "1.10" statrs = "0.17" csv = "1.3" -ndarray = "0.16" serde = { version = "1", features = ["derive"] } itertools = "0.13.0" -serde_json = "1.0.122" +serde_json = "1.0.127" path-tools = "0.1.0" atomic_float = "1.0.0" rand = "0.8.5" @@ -56,6 +53,18 @@ arrow2 = { version = "0.18.0", features = [ ] } kders = { git = "https://github.com/COMBINE-lab/kde-rs.git", branch = "dev", version = "0.1.0" } noodles-bgzf = { version = "0.32.0" } +crossbeam = { version = "0.8.4", features = [ + "crossbeam-queue", + "crossbeam-channel", +] } +sprs = "0.11.1" +minimap2-sys = { version = "0.1.19" } +# rely on minimap2-temp until upstream version is pushed +minimap2-temp = { version = "0.1.30" } +#git = "https://github.com/rob-p/minimap2-rs.git", branch = "alignment-score" } +#git = "https://github.com/jguhlin/minimap2-rs.git", branch = "alignment-score" } +needletail = "0.5.1" +indicatif = "0.17.8" [[bin]] name = "oarfish" diff --git a/docs/index.md b/docs/index.md index db14513..2b884ae 100644 --- a/docs/index.md +++ b/docs/index.md @@ -52,26 +52,43 @@ filters: only alignments to this strand will be allowed; options are (fw /+, rc/-, or both/.) [default: .] ``` -The input should be a `bam` format file, with reads aligned using [`minimap2`](https://github.com/lh3/minimap2) against the _transcriptome_. That is, `oarfish` does not currently handle spliced alignment to the genome. Further, the output alignments should be name sorted (the default order produced by `minimap2` should be fine). _Specifically_, `oarfish` relies on the existence of the `AS` tag in the `bam` records that encodes the alignment score in order to obtain the score for each alignment (which is used in probabilistic read assignment), and the score of the best alignment, overall, for each read. The parameters above should be explained by their relevant help option, but the -`-d`/`--strand-filter` is worth noting explicitly. By default, alignments to both strands of a transcript will be considered valid. You can use this option to allow only alignments in the specified orientation; for example -`-d fw` will allow only alignments in the forward orientation and `-d rc` will allow only alignments in the reverse-complement orientation and `-d both` (the default) will allow both. The `-d` filter, if explicitly provided, overrides -the orientation filter in any provided "filter group" so e.g. passing `--filter-group no-filters -d fw` will disable other filters, but will still only admit alignments in the forward orientation. -### Choosing `minimap2` alignment options +## Input to `oarfish` -Since the purpose of `oarfish` is to estimate transcript abundance from a collection of alignments to the target transcriptome, it is important that the alignments are generated -in a fashion that is compatible with this goal. Primarily, this means that the aligner should be configured to report as many optimal (and near-optimal) alignments as exist, so that -`oarfish` can observe all of this information and determine how to allocate reads to transcripts. We recommend using the following options with `minimap2` when aligning data for -later processing by `oarfish` +`Oarfish` can accept as input either a `bam` file containing reads aligned to the transcriptome as specified [below](index.md#alignment-based-input), or +raw sequencing reads themselves (along with a reference transcriptome), which are then mapped to the reference using [minimap2-rs](https://github.com/jguhlin/minimap2-rs) +and subsequently processed with `oarfish`. With equivalent alignment options, the results of these input modes should be equivalent, so which to use is therefore +based on the preference of the user. - * For ONT data (either dRNA or cDNA): please use the flags `--eqx -N 100 -ax map-ont` - * For PacBio data: please use the flags `--eqx -N 100 -ax pacbio` -**Note (1)**: It may be worthwile using an even larger `N` value (e.g. the [TranSigner manuscript](https://www.biorxiv.org/content/10.1101/2024.04.13.589356v1.full) recommends `-N 181`). A larger value should not diminish the -accuracy of `oarfish`, but it may make alignment take longer and produce a larger `bam` file. +### Read-based input -**Note (2)**: For very high quality PacBio data, it may be most appropriate to use the `-ax map-hifi` flag in place of `-ax pacbio`. We are currently evaluating the effect of this option, and also welcome feedback if you -have experiences to share on the use of data aligned with these different flags with `oarfish`. +The read-based input mode takes as input a reference (specified with the `--reference` argument), which can be either a `FASTA` file containing a transcriptome reference +or an pre-build `minimap2` index, as well as a set of reads (specified with the `--reads` argument), and a `--seq-tech` argument specifying the sequencing technology +type of the reads to be mapped. + +The mapping between the potential values that can be passed to `oarfish`'s `--seq-tech` argument and the `minimap2` presets is as follows: + + - `oarfish` seq-tech `ont-cdna` corresponds to `minimap2` preset `map-ont` + - `oarfish` seq-tech `ont-drna` corresponds to `minimap2` preset `map-ont` + - `oarfish` seq-tech `pac-bio` corresponds to `minimap2` preset `map-pb` + - `oarfish` seq-tech `pac-bio-hifi` corresponds to `minimap2` preset `map-hifi` + +Given these inputs, `oarfish` will either load the pre-built `minimap2` index, or build one according to the parameter specified by `--seq-tech`, and will then align +the reads to this index using [`minimap2-rs`](https://github.com/jguhlin/minimap2-rs). Optionally, the maximum multimapping rate (i.e. the number of secondary alignments +corresponding to the `minimap2` parameter `-N`) can be specified with the command line parameter `--best-n`. The default value of this parameter is 100. + +### Alignmment-based input + +In alignment-based mode, `oarfish` processes pre-computed alignments of hte read to the transcriptome. The input should be a `bam` format file, with reads aligned using [`minimap2`](https://github.com/lh3/minimap2) against the _transcriptome_. That is, `oarfish` does not currently handle spliced alignment to the genome. Further, the output alignments should be name sorted (the default order produced by `minimap2` should be fine). _Specifically_, `oarfish` relies on the existence of the `AS` tag in the `bam` records that encodes the alignment score in order to obtain the score for each alignment (which is used in probabilistic read assignment), and the score of the best alignment, overall, for each read. ### Choosing `minimap2` alignment options Since the purpose of `oarfish` is to estimate transcript abundance from a collection of alignments to the target transcriptome, it is important that the alignments are generated in a fashion that is compatible with this goal. Primarily, this means that the aligner should be configured to report as many optimal (and near-optimal) alignments as exist, so that `oarfish` can observe all of this information and determine how to allocate reads to transcripts. We recommend using the following options with `minimap2` when aligning data for later processing by `oarfish` * For ONT data (either dRNA or cDNA): please use the flags `--eqx -N 100 -ax map-ont` For PacBio data: please use the flags `--eqx -N 100 -ax pacbio` **Note (1)**: It may be worthwile using an even larger `N` value (e.g. the [TranSigner manuscript](https://www.biorxiv.org/content/10.1101/2024.04.13.589356v1.full) recommends `-N 181`). A larger value should not diminish the accuracy of `oarfish`, but it may make alignment take longer and produce a larger `bam` file. + +**Note (2)**: For very high quality PacBio data, it may be most appropriate to use the `-ax map-hifi` flag in place of `-ax pacbio`. We are currently evaluating the effect of this option, and also welcome feedback if you have experiences to share on the use of data aligned with these different flags with `oarfish`. + +### Other notes on `oarfish` parameters + +The parameters above should be explained by their relevant help option, but the `-d`/`--strand-filter` is worth noting explicitly. By default, alignments to both strands of a transcript will be considered valid. You can use this option to allow only alignments in the specified orientation; for example `-d fw` will allow only alignments in the forward orientation and `-d rc` will allow only alignments in the reverse-complement orientation and `-d both` (the default) will allow both. The `-d` filter, if explicitly provided, overrides the orientation filter in any provided "filter group" so e.g. passing `--filter-group no-filters -d fw` will disable other filters, but will still only admit alignments in the forward orientation. + +**In general**, if you apply a `filter-group`, the group options will be applied first and then any explicitly provided options given will override the corresponding option in the `filter-group`. ### Inferential Replicates diff --git a/src/alignment_parser.rs b/src/alignment_parser.rs index 37f50bc..f0f09b3 100644 --- a/src/alignment_parser.rs +++ b/src/alignment_parser.rs @@ -1,10 +1,11 @@ use crate::util::oarfish_types::{InMemoryAlignmentStore, TranscriptInfo}; use noodles_bam as bam; -use noodles_sam::Header; +use noodles_sam::header::record::value::map::tag; +use noodles_sam::{alignment::RecordBuf, Header}; use num_format::{Locale, ToFormattedString}; use std::io; use std::path::Path; -use tracing::{info, trace}; +use tracing::{error, info}; pub fn read_and_verify_header( reader: &mut bam::io::Reader, @@ -21,6 +22,23 @@ pub fn read_and_verify_header( .to_formatted_string(&Locale::en) ); + // sort order tag + let so = tag::Other::try_from([b'S', b'O'])?; + let so_type = header + .header() + .expect("has inner header") + .other_fields() + .get(&so) + .expect("BAM file should have @SO field"); + + if so_type == "coordinate" { + error!("oarfish is not designed to process coordinate sorted BAM files."); + anyhow::bail!( + "You provided a coordinate-sorted BAM, but oarfish does not support processing these. + You should provide a BAM file collated by record name (which is the \"natural\" minimap2 order)." + ); + } + let mut saw_minimap2 = false; let mut progs = vec![]; // explicitly check that alignment was done with a supported @@ -42,11 +60,174 @@ pub fn read_and_verify_header( Ok(header) } +pub enum NextAction { + SkipUnmapped, + ProcessSameBarcode, + NewBarcode, + EndOfFile, +} + +#[inline(always)] +fn is_same_barcode(rec: &RecordBuf, current_barcode: &[u8]) -> anyhow::Result { + const CB_TAG: [u8; 2] = [b'C', b'B']; + let same_barcode = match rec.data().get(&CB_TAG) { + None => anyhow::bail!("could not get CB tag value"), + Some(v) => match v { + noodles_sam::alignment::record_buf::data::field::Value::String(x) => { + x.as_slice() == current_barcode + } + _ => anyhow::bail!("CB tag value had unexpected type!"), + }, + }; + Ok(same_barcode) +} + +/// Takes a collection of [RecordBuf]s, a mutable reference to an [InMemoryAlignmentStore] +/// as well as the relevant [TranscriptInfo]. +/// +/// This function first sorts the `records` by the read name (ensuring that any primary alignment +/// of the read comes first), so that reads are collated by name. Then, it processes in turn the +/// alignments for each input read, filtering them according to the filters attached to the +/// `store`. Subsequently, the alignments are summarized in the `store`. If all `records` are +/// processed successfully, [Ok]`()` is returned, otherwise the relevant [anyhow::Error] is +/// returned. +pub fn sort_and_parse_barcode_records( + records: &mut Vec, + store: &mut InMemoryAlignmentStore, + txps: &mut [TranscriptInfo], + records_for_read: &mut Vec, +) -> anyhow::Result<()> { + records_for_read.clear(); + let mut prev_read = String::new(); + + // first sort records by read name + records.sort_unstable_by(|x, y| match x.name().cmp(&y.name()) { + std::cmp::Ordering::Equal => { + match (x.flags().is_secondary(), y.flags().is_secondary()) { + (false, true) => std::cmp::Ordering::Less, + (true, false) => std::cmp::Ordering::Greater, + (true, true) => std::cmp::Ordering::Equal, + // this one shouldn't happen + (false, false) => std::cmp::Ordering::Equal, + } + } + x => x, + }); + + // Parse the input alignemnt file, gathering the alignments aggregated + // by their source read. **Note**: this requires that we have a + // name-sorted input bam file (currently, aligned against the transcriptome). + // + // *NOTE*: this had to be changed from `records` to `record_bufs` or + // critical information was missing from the records. This happened when + // moving to the new version of noodles. Track `https://github.com/zaeleus/noodles/issues/230` + // to see if it's clear why this is the case + for record in records { + if let Some(rname) = record.name() { + let record_copy = record.clone(); + let rstring: String = String::from_utf8_lossy(rname.as_ref()).into_owned(); + // if this is an alignment for the same read, then + // push it onto our temporary vector. + if prev_read == rstring { + if let Some(_ref_id) = record.reference_sequence_id() { + records_for_read.push(record_copy); + } + } else { + // otherwise, record the alignment range for the + // previous read record. + if !prev_read.is_empty() { + store.add_group(txps, records_for_read); + if records_for_read.len() == 1 { + store.inc_unique_alignments(); + } + records_for_read.clear(); + } + // the new "prev_read" name is the current read name + // so it becomes the first on the new alignment range + // vector. + prev_read = rstring; + if let Some(_ref_id) = record.reference_sequence_id() { + records_for_read.push(record_copy); + } + } + } + } + // if we end with a non-empty alignment range vector, then + // add that group. + if !records_for_read.is_empty() { + store.add_group(txps, records_for_read); + if records_for_read.len() == 1 { + store.inc_unique_alignments(); + } + records_for_read.clear(); + } + Ok(()) +} + +#[inline(always)] +pub fn parse_alignments_for_barcode( + iter: &mut core::iter::Peekable>, + current_cb: &[u8], +) -> anyhow::Result> { + //records_for_read.clear(); + let mut records_for_barcode = + Vec::::with_capacity(2048); + let mut _records_processed = 0_usize; + + // Parse the input alignemnt file, gathering the alignments aggregated + // by their source read. **Note**: this requires that we have a + // name-sorted input bam file (currently, aligned against the transcriptome). + // + // *NOTE*: this had to be changed from `records` to `record_bufs` or + // critical information was missing from the records. This happened when + // moving to the new version of noodles. Track `https://github.com/zaeleus/noodles/issues/230` + // to see if it's clear why this is the case + + loop { + let action = if let Some(result) = iter.peek() { + if let Ok(record) = result { + // unmapped reads don't contribute to quantification + // but we track them. + if record.flags().is_unmapped() { + _records_processed += 1; + NextAction::SkipUnmapped + } else { + let same_barcode = is_same_barcode(record, current_cb)?; + if !same_barcode { + NextAction::NewBarcode + } else { + NextAction::ProcessSameBarcode + } + } + } else { + anyhow::bail!("error parsing record"); + } + } else { + NextAction::EndOfFile + }; + + match action { + NextAction::SkipUnmapped => { + iter.next().unwrap()?; + } + NextAction::ProcessSameBarcode => { + let record = iter.next().unwrap()?; + records_for_barcode.push(record.clone()); + } + NextAction::NewBarcode | NextAction::EndOfFile => { + break; + } + }; + } + Ok(records_for_barcode) +} + pub fn parse_alignments( store: &mut InMemoryAlignmentStore, header: &Header, reader: &mut bam::io::Reader, txps: &mut [TranscriptInfo], + quiet: bool, ) -> anyhow::Result<()> { // we'll need these to keep track of which alignments belong // to which reads. @@ -54,6 +235,21 @@ pub fn parse_alignments( let mut num_unmapped = 0_u64; let mut records_for_read = vec![]; + let pb = if quiet { + indicatif::ProgressBar::hidden() + } else { + indicatif::ProgressBar::new_spinner().with_message("Number of reads mapped") + }; + + pb.set_style( + indicatif::ProgressStyle::with_template( + "[{elapsed_precise}] {spinner:4.green/blue} {msg} {human_pos:>12}", + ) + .unwrap() + .tick_chars("⠁⠁⠉⠙⠚⠒⠂⠂⠒⠲⠴⠤⠄⠄⠤⠠⠠⠤⠦⠖⠒⠐⠐⠒⠓⠋⠉⠈⠈"), + ); + pb.set_draw_target(indicatif::ProgressDrawTarget::stderr_with_hz(4)); + // Parse the input alignemnt file, gathering the alignments aggregated // by their source read. **Note**: this requires that we have a // name-sorted input bam file (currently, aligned against the transcriptome). @@ -62,12 +258,10 @@ pub fn parse_alignments( // critical information was missing from the records. This happened when // moving to the new version of noodles. Track `https://github.com/zaeleus/noodles/issues/230` // to see if it's clear why this is the case - for (i, result) in reader.record_bufs(header).enumerate() { + for result in reader.record_bufs(header) { let record = result?; + pb.inc(1); - if i % 100_000 == 1 { - trace!("processed {i} alignment records"); - } // unmapped reads don't contribute to quantification // but we track them. if record.flags().is_unmapped() { @@ -113,6 +307,8 @@ pub fn parse_alignments( records_for_read.clear(); } + pb.finish_with_message("Finished processing alignments."); + info!( "the alignment file contained {} unmapped read records.", num_unmapped.to_formatted_string(&Locale::en) diff --git a/src/bulk.rs b/src/bulk.rs new file mode 100644 index 0000000..a6b7afe --- /dev/null +++ b/src/bulk.rs @@ -0,0 +1,480 @@ +use crate::alignment_parser; +use crate::em; +use crate::kde_utils; +use crate::prog_opts::Args; +use crate::util::oarfish_types::AlnInfo; +use crate::util::oarfish_types::DiscardTable; +use crate::util::oarfish_types::{ + AlignmentFilters, EMInfo, InMemoryAlignmentStore, TranscriptInfo, +}; +use crate::util::read_function::read_short_quant_vec; +use crate::util::write_function::{write_infrep_file, write_output}; +use crate::{binomial_continuous_prob, normalize_read_probs}; +use arrow2::{array::Float64Array, chunk::Chunk, datatypes::Field}; +use crossbeam::channel::bounded; +use crossbeam::channel::Receiver; +use crossbeam::channel::Sender; +#[allow(unused_imports)] +use minimap2_sys as mm_ffi; +use minimap2_temp as minimap2; +use needletail::parse_fastx_file; +use noodles_bam as bam; +use num_format::{Locale, ToFormattedString}; +use serde_json::json; +use std::io::BufRead; +use tracing::{info, warn}; + +/// Produce a [serde_json::Value] that encodes the relevant arguments and +/// parameters of the run that we wish to record to file. Ultimately, this +/// will be written to the corresponding `meta_info.json` file for this run. +fn get_json_info(args: &Args, emi: &EMInfo, seqcol_digest: &str) -> serde_json::Value { + let prob = if args.model_coverage { + "scaled_binomial" + } else { + "no_coverage" + }; + + let source = if args.alignments.is_some() { + "from_bam" + } else { + "from_raw_reads" + }; + + json!({ + "prob_model" : prob, + "alignment_source" : source, + "bin_width" : args.bin_width, + "filter_options" : &emi.eq_map.filter_opts, + "discard_table" : &emi.eq_map.discard_table, + "alignments": &args.alignments, + "output": &args.output, + "verbose": &args.verbose, + "single_cell": &args.single_cell, + "quiet": &args.quiet, + "em_max_iter": &args.max_em_iter, + "em_convergence_thresh": &args.convergence_thresh, + "threads": &args.threads, + "filter_group": &args.filter_group, + "short_quant": &args.short_quant, + "num_bootstraps": &args.num_bootstraps, + "seqcol_digest": seqcol_digest + }) +} + +fn perform_inference_and_write_output( + header: &noodles_sam::header::Header, + store: &mut InMemoryAlignmentStore, + txps: &mut [TranscriptInfo], + txps_name: &[String], + seqcol_digest: String, + args: &Args, +) -> anyhow::Result<()> { + // print discard table information in which the user might be interested. + info!("\ndiscard_table: \n{}\n", store.discard_table.to_table()); + + // if we are using the KDE, create that here. + let kde_opt: Option = if args.use_kde { + Some(kde_utils::get_kde_model(txps, store)?) + } else { + None + }; + + if store.filter_opts.model_coverage { + //obtaining the Cumulative Distribution Function (CDF) for each transcript + binomial_continuous_prob(txps, &args.bin_width, args.threads); + //Normalize the probabilities for the records of each read + normalize_read_probs(store, txps, &args.bin_width); + } + + info!( + "Total number of alignment records : {}", + store.total_len().to_formatted_string(&Locale::en) + ); + info!( + "number of aligned reads : {}", + store.num_aligned_reads().to_formatted_string(&Locale::en) + ); + info!( + "number of unique alignments : {}", + store.unique_alignments().to_formatted_string(&Locale::en) + ); + + // if we are seeding the quantification estimates with short read + // abundances, then read those in here. + let init_abundances = args.short_quant.as_ref().map(|sr_path| { + read_short_quant_vec(sr_path, txps_name).unwrap_or_else(|e| panic!("{}", e)) + }); + + // wrap up all of the relevant information we need for estimation + // in an EMInfo struct and then call the EM algorithm. + let emi = EMInfo { + eq_map: store, + txp_info: txps, + max_iter: args.max_em_iter, + convergence_thresh: args.convergence_thresh, + init_abundances, + kde_model: kde_opt, + }; + + if args.use_kde { + /* + // run EM for model train iterations + let orig_iter = emi.max_iter; + emi.max_iter = 10; + let counts = em::em(&emi, args.threads); + // relearn the kde + let new_model = + kde_utils::refresh_kde_model(&txps, &store, &emi.kde_model.unwrap(), &counts); + info!("refreshed KDE model"); + emi.kde_model = Some(new_model?); + emi.max_iter = orig_iter; + */ + } + + let counts = if args.threads > 4 { + em::em_par(&emi, args.threads) + } else { + em::em(&emi, args.threads) + }; + + let aux_txp_counts = crate::util::aux_counts::get_aux_counts(store, txps)?; + + // prepare the JSON object we'll write + // to meta_info.json + let json_info = get_json_info(args, &emi, &seqcol_digest); + + // write the output + write_output(&args.output, json_info, header, &counts, &aux_txp_counts)?; + + // if the user requested bootstrap replicates, + // compute and write those out now. + if args.num_bootstraps > 0 { + let breps = em::bootstrap(&emi, args.num_bootstraps, args.threads); + + let mut new_arrays = vec![]; + let mut bs_fields = vec![]; + for (i, b) in breps.into_iter().enumerate() { + let bs_array = Float64Array::from_vec(b); + bs_fields.push(Field::new( + format!("bootstrap.{}", i), + bs_array.data_type().clone(), + false, + )); + new_arrays.push(bs_array.boxed()); + } + let chunk = Chunk::new(new_arrays); + write_infrep_file(&args.output, bs_fields, chunk)?; + } + Ok(()) +} + +pub fn quantify_bulk_alignments_from_bam( + header: &noodles_sam::Header, + filter_opts: AlignmentFilters, + reader: &mut bam::io::Reader, + txps: &mut [TranscriptInfo], + txps_name: &[String], + args: &Args, + seqcol_digest: String, +) -> anyhow::Result<()> { + // now parse the actual alignments for the reads and store the results + // in our in-memory stor + let mut store = InMemoryAlignmentStore::new(filter_opts, header); + alignment_parser::parse_alignments(&mut store, header, reader, txps, args.quiet)?; + perform_inference_and_write_output(header, &mut store, txps, txps_name, seqcol_digest, args) +} + +#[derive(Clone)] +struct ReadChunkWithNames { + read_seq: Vec, + read_names: Vec, + seq_sep: Vec, + name_sep: Vec, +} + +impl ReadChunkWithNames { + pub fn new() -> Self { + Self { + read_seq: Vec::new(), + read_names: Vec::new(), + seq_sep: vec![0usize], + name_sep: vec![0usize], + } + } + + #[inline(always)] + pub fn add_id_and_read(&mut self, id: &[u8], read: &[u8]) { + self.read_names.extend_from_slice(id); + self.read_names.push(b'\0'); + self.read_seq.extend_from_slice(read); + self.name_sep.push(self.read_names.len()); + self.seq_sep.push(self.read_seq.len()); + } + + #[inline(always)] + pub fn clear(&mut self) { + self.read_names.clear(); + self.read_seq.clear(); + self.name_sep.clear(); + self.name_sep.push(0); + self.seq_sep.clear(); + self.seq_sep.push(0); + } + + pub fn iter(&self) -> ReadChunkIter { + ReadChunkIter { + chunk: self, + pos: 0, + } + } +} + +struct ReadChunkIter<'a> { + chunk: &'a ReadChunkWithNames, + pos: usize, +} + +impl<'a> Iterator for ReadChunkIter<'a> { + type Item = (&'a [u8], &'a [u8]); + + #[inline(always)] + fn next(&mut self) -> Option { + if self.pos < self.chunk.seq_sep.len() - 1 { + let i = self.pos; + let name: &[u8] = + &self.chunk.read_names[self.chunk.name_sep[i]..self.chunk.name_sep[i + 1]]; + let seq: &[u8] = &self.chunk.read_seq[self.chunk.seq_sep[i]..self.chunk.seq_sep[i + 1]]; + self.pos += 1; + Some((name, seq)) + } else { + None + } + } + + #[inline(always)] + fn size_hint(&self) -> (usize, Option) { + let rem = (self.chunk.seq_sep.len() - 1) - self.pos; + (rem, Some(rem)) + } +} + +impl<'a> ExactSizeIterator for ReadChunkIter<'a> {} + +#[allow(clippy::too_many_arguments)] +pub fn quantify_bulk_alignments_raw_reads( + header: &noodles_sam::Header, + aligner: minimap2::Aligner, + filter_opts: AlignmentFilters, + read_paths: &[std::path::PathBuf], + txps: &mut [TranscriptInfo], + txps_name: &[String], + args: &Args, + seqcol_digest: String, +) -> anyhow::Result<()> { + // now parse the actual alignments for the reads and store the results + // in our in-memory stor + + // we will take only shared refs to this + let mut txp_info_view: Vec = Vec::with_capacity(txps.len()); + for ti in txps.iter() { + txp_info_view.push(ti.clone()); + } + + // at least one mapping thread, otherwise everything but the fastx parser + // and the in memory alignment store populator + let map_threads = args.threads.saturating_sub(2).max(1); + + type ReadGroup = ReadChunkWithNames; + type AlignmentGroupInfo = (Vec, Vec, Vec); + let mut store = std::thread::scope(|s| { + const READ_CHUNK_SIZE: usize = 200; + const ALN_GROUP_CHUNK_LIMIT: usize = 100; + + let (read_sender, read_receiver): (Sender, Receiver) = + bounded(args.threads * 10); + let (aln_group_sender, aln_group_receiver): ( + Sender, + Receiver, + ) = bounded(args.threads * 100); + + // Producer thread: reads sequences and sends them to the channel + let producer = s.spawn(move || { + let mut ctr = 0_usize; + let mut chunk_size = 0_usize; + let mut read_chunk = ReadChunkWithNames::new(); + + for read_path in read_paths { + let mut reader = + parse_fastx_file(read_path).expect("valid path/file to read sequences"); + + while let Some(result) = reader.next() { + let record = result.expect("Error reading record"); + + chunk_size += 1; + ctr += 1; + + // put this read on the current chunk + read_chunk.add_id_and_read(record.id(), &record.seq()); + + // send off the next chunks of reads to a thread + if chunk_size >= READ_CHUNK_SIZE { + read_sender + .send(read_chunk.clone()) + .expect("Error sending sequence"); + // prepare for the next chunk + read_chunk.clear(); + chunk_size = 0; + } + } + } + // if any reads remain, send them off + if chunk_size > 0 { + read_sender + .send(read_chunk) + .expect("Error sending sequence"); + } + ctr + }); + + // Consumer threads: receive sequences and perform alignment + let consumers: Vec<_> = (0..map_threads) + .map(|_| { + let receiver = read_receiver.clone(); + let mut filter = filter_opts.clone(); + let mut loc_aligner = aligner.clone().with_cigar(); + + let my_txp_info_view = &txp_info_view; + let aln_group_sender = aln_group_sender.clone(); + s.spawn(move || { + let mut discard_table = DiscardTable::new(); + + let mut chunk_size = 0_usize; + let mut aln_group_alns: Vec = Vec::new(); + let mut aln_group_probs: Vec = Vec::new(); + let mut aln_group_boundaries: Vec = Vec::new(); + aln_group_boundaries.push(0); + + // get the next chunk of reads + for read_chunk in receiver { + // iterate over every read + for (name, seq) in read_chunk.iter() { + // map the next read, with cigar string + let map_res_opt = + loc_aligner.map_with_name(name, seq, true, false, None, None); + if let Ok(mut mappings) = map_res_opt { + let (ag, aprobs) = filter.filter( + &mut discard_table, + header, + my_txp_info_view, + &mut mappings, + ); + if !ag.is_empty() { + aln_group_alns.extend_from_slice(&ag); + aln_group_probs.extend_from_slice(&aprobs); + aln_group_boundaries.push(aln_group_alns.len()); + chunk_size += 1; + } + if chunk_size >= ALN_GROUP_CHUNK_LIMIT { + aln_group_sender + .send(( + aln_group_alns.clone(), + aln_group_probs.clone(), + aln_group_boundaries.clone(), + )) + .expect("Error sending alignment group"); + aln_group_alns.clear(); + aln_group_probs.clear(); + aln_group_boundaries.clear(); + aln_group_boundaries.push(0); + chunk_size = 0; + } + } else { + warn!( + "Error encountered mapping read : {}", + map_res_opt.unwrap_err() + ); + } + } + } + if chunk_size > 0 { + aln_group_sender + .send((aln_group_alns, aln_group_probs, aln_group_boundaries)) + .expect("Error sending alignment group"); + } + // NOTE: because `clone()` clones the raw pointer, if it is + // still set when this tread goes out of scope, the underlying + // raw pointer will be freed and the other aligners will have + // references to already freed memory and this will lead to a + // double-free when they are dropped. So, to avoid this, here + // we set the idx pointer to None directly. Track: https://github.com/jguhlin/minimap2-rs/issues/71 + loc_aligner.idx = None; + discard_table + }) + }) + .collect(); + + #[allow(clippy::useless_asref)] + let txps_mut = txps.as_mut(); + let filter_opts_store = filter_opts.clone(); + let aln_group_consumer = s.spawn(move || { + let mut store = InMemoryAlignmentStore::new(filter_opts_store, header); + let pb = if args.quiet { + indicatif::ProgressBar::hidden() + } else { + indicatif::ProgressBar::new_spinner().with_message("Number of reads mapped") + }; + + pb.set_style( + indicatif::ProgressStyle::with_template( + "[{elapsed_precise}] {spinner:4.green/blue} {msg} {human_pos:>12}", + ) + .unwrap() + .tick_chars("⠁⠁⠉⠙⠚⠒⠂⠂⠒⠲⠴⠤⠄⠄⠤⠠⠠⠤⠦⠖⠒⠐⠐⠒⠓⠋⠉⠈⠈"), + ); + pb.set_draw_target(indicatif::ProgressDrawTarget::stderr_with_hz(4)); + + for (ags, aprobs, aln_boundaries) in aln_group_receiver { + for window in aln_boundaries.windows(2) { + pb.inc(1); + let group_start = window[0]; + let group_end = window[1]; + let ag = &ags[group_start..group_end]; + let as_probs = &aprobs[group_start..group_end]; + if ag.len() == 1 { + store.inc_unique_alignments(); + } + store.add_filtered_group(ag, as_probs, txps_mut); + } + } + pb.finish_with_message("Finished aligning reads."); + store + }); + + // Wait for the producer to finish reading + let total_reads = producer.join().expect("Producer thread panicked"); + + let mut discard_tables: Vec = Vec::with_capacity(map_threads); + for consumer in consumers { + let dt = consumer.join().expect("Consumer thread panicked"); + discard_tables.push(dt); + } + + drop(aln_group_sender); + + let mut store = aln_group_consumer + .join() + .expect("Alignment group consumer panicked"); + + info!( + "Parsed {} total reads", + total_reads.to_formatted_string(&Locale::en) + ); + + for dt in &discard_tables { + store.aggregate_discard_table(dt); + } + store + }); + + perform_inference_and_write_output(header, &mut store, txps, txps_name, seqcol_digest, args) +} diff --git a/src/em.rs b/src/em.rs index a2f6ee3..ed8b910 100644 --- a/src/em.rs +++ b/src/em.rs @@ -146,7 +146,7 @@ pub fn do_em<'a, I: Iterator + 'a, ) -> Vec { let eq_map = em_info.eq_map; let fops = &eq_map.filter_opts; - let tinfo: &Vec = em_info.txp_info; + let tinfo: &[TranscriptInfo] = em_info.txp_info; let max_iter = em_info.max_iter; let convergence_thresh = em_info.convergence_thresh; let total_weight: f64 = eq_map.num_aligned_reads() as f64; @@ -327,7 +327,7 @@ pub fn em_par(em_info: &EMInfo, nthreads: usize) -> Vec { let eq_map = em_info.eq_map; let fops = &eq_map.filter_opts; - let tinfo: &Vec = em_info.txp_info; + let tinfo: &[TranscriptInfo] = em_info.txp_info; let max_iter = em_info.max_iter; let convergence_thresh = em_info.convergence_thresh; let total_weight: f64 = eq_map.num_aligned_reads() as f64; diff --git a/src/main.rs b/src/main.rs index 688b424..b517dbf 100644 --- a/src/main.rs +++ b/src/main.rs @@ -2,215 +2,225 @@ use clap::Parser; use std::num::NonZeroUsize; use anyhow::Context; -use arrow2::{array::Float64Array, chunk::Chunk, datatypes::Field}; - -use std::{fs::File, io, path::PathBuf}; +use core::ffi; +use minimap2_sys as mm_ffi; +use minimap2_temp as minimap2; use num_format::{Locale, ToFormattedString}; -use serde::Serialize; -use serde_json::json; +use std::{fs::File, io}; + use tracing::info; use tracing_subscriber::{filter::LevelFilter, fmt, prelude::*, EnvFilter}; use noodles_bam as bam; use noodles_bgzf as bgzf; +use noodles_sam::header::record::value as header_val; +use noodles_sam::header::record::value::Map as HeaderMap; mod alignment_parser; mod bootstrap; +mod bulk; mod em; +mod prog_opts; +mod single_cell; mod util; +use crate::prog_opts::{Args, FilterGroup, SequencingTech}; use crate::util::normalize_probability::normalize_read_probs; -use crate::util::oarfish_types::{ - AlignmentFilters, EMInfo, InMemoryAlignmentStore, TranscriptInfo, -}; -use crate::util::read_function::read_short_quant_vec; -use crate::util::write_function::{write_infrep_file, write_output}; +use crate::util::oarfish_types::{AlignmentFilters, TranscriptInfo}; use crate::util::{binomial_probability::binomial_continuous_prob, kde_utils}; -/// These represent different "meta-options", specific settings -/// for all of the different filters that should be applied in -/// different cases. -#[derive(Clone, Debug, clap::ValueEnum, Serialize)] -enum FilterGroup { - NoFilters, - NanocountFilters, -} +type HeaderReaderAligner = ( + noodles_sam::header::Header, + Option>>, + Option, +); + +fn get_aligner_from_args(args: &Args) -> anyhow::Result { + info!("read-based mode"); + + // set the number of indexing threads + let idx_threads = &args.threads.max(1); + + // if the user requested to write the output index to disk, prepare for that + let idx_out_as_str = args.index_out.clone().map_or(String::new(), |x| { + x.to_str() + .expect("could not convert PathBuf to &str") + .to_owned() + }); + let idx_output = args.index_out.as_ref().map(|_| idx_out_as_str.as_str()); + + // create the aligner + let mut aligner = match args.seq_tech { + Some(SequencingTech::OntCDNA) | Some(SequencingTech::OntDRNA) => { + minimap2::Aligner::builder() + .with_index_threads(*idx_threads) + .with_cigar() + .map_ont() + .with_index( + &args + .reference + .clone() + .expect("must provide reference sequence"), + idx_output, + ) + .expect("could not construct minimap2 index") + } + Some(SequencingTech::PacBio) => minimap2::Aligner::builder() + .with_index_threads(*idx_threads) + .with_cigar() + .map_pb() + .with_index( + &args + .reference + .clone() + .expect("must provide reference sequence"), + idx_output, + ) + .expect("could not construct minimap2 index"), + Some(SequencingTech::PacBioHifi) => minimap2::Aligner::builder() + .with_index_threads(*idx_threads) + .with_cigar() + .map_hifi() + .with_index( + &args + .reference + .clone() + .expect("must provide reference sequence"), + idx_output, + ) + .expect("could not construct minimap2 index"), + None => { + anyhow::bail!("sequencing tech must be provided in read mode, but it was not!"); + } + }; + + info!("created aligner index opts : {:?}", aligner.idxopt); + // get the best 100 hits + aligner.mapopt.best_n = args.best_n as i32; + aligner.mapopt.seed = 11; + + let mmi: mm_ffi::mm_idx_t = unsafe { **aligner.idx.as_ref().unwrap() }; + let n_seq = mmi.n_seq; + + info!( + "index contains {} sequences", + n_seq.to_formatted_string(&Locale::en) + ); -fn parse_strand(arg: &str) -> anyhow::Result { - match arg { - "+" | "fw" | "FW" | "f" | "F" => Ok(bio_types::strand::Strand::Forward), - "-" | "rc" | "RC" | "r" | "R" => Ok(bio_types::strand::Strand::Reverse), - "." | "both" | "either" => Ok(bio_types::strand::Strand::Unknown), - _ => anyhow::bail!("Cannot parse {} as a valid strand type", arg), + let mut header = noodles_sam::header::Header::builder(); + + #[derive(Debug, PartialEq, Eq)] + pub struct SeqMetaData { + pub name: String, + pub length: u32, + pub is_alt: bool, } -} -/// accurate transcript quantification from long-read RNA-seq data -#[derive(Parser, Debug, Serialize)] -#[clap(author, version, about, long_about = None)] -struct Args { - /// be quiet (i.e. don't output log messages that aren't at least warnings) - #[arg(long, conflicts_with = "verbose")] - quiet: bool, - - /// be verbose (i.e. output all non-developer logging messages) - #[arg(long)] - verbose: bool, - - /// path to the file containing the input alignments - #[arg(short, long, required = true)] - alignments: PathBuf, - /// location where output quantification file should be written - #[arg(short, long, required = true)] - output: PathBuf, - - #[arg(long, help_heading = "filters", value_enum)] - filter_group: Option, - - /// maximum allowable distance of the right-most end of an alignment from the 3' transcript end - #[arg(short, long, conflicts_with = "filter-group", help_heading="filters", default_value_t = u32::MAX as i64)] - three_prime_clip: i64, - /// maximum allowable distance of the left-most end of an alignment from the 5' transcript end - #[arg(short, long, conflicts_with = "filter-group", help_heading="filters", default_value_t = u32::MAX)] - five_prime_clip: u32, - /// fraction of the best possible alignment score that a secondary alignment must have for - /// consideration - #[arg( - short, - long, - conflicts_with = "filter-group", - help_heading = "filters", - default_value_t = 0.95 - )] - score_threshold: f32, - /// fraction of a query that must be mapped within an alignemnt to consider the alignemnt - /// valid - #[arg( - short, - long, - conflicts_with = "filter-group", - help_heading = "filters", - default_value_t = 0.5 - )] - min_aligned_fraction: f32, - /// minimum number of nucleotides in the aligned portion of a read - #[arg( - short = 'l', - long, - conflicts_with = "filter-group", - help_heading = "filters", - default_value_t = 50 - )] - min_aligned_len: u32, - /// only alignments to this strand will be allowed; options are (fw /+, rc/-, or both/.) - #[arg( - short = 'd', - long, - conflicts_with = "filter-group", - help_heading = "filters", - default_value_t = bio_types::strand::Strand::Unknown, - value_parser = parse_strand - )] - strand_filter: bio_types::strand::Strand, - /// apply the coverage model - #[arg(long, help_heading = "coverage model", value_parser)] - model_coverage: bool, - /// maximum number of iterations for which to run the EM algorithm - #[arg(long, help_heading = "EM", default_value_t = 1000)] - max_em_iter: u32, - /// maximum number of iterations for which to run the EM algorithm - #[arg(long, help_heading = "EM", default_value_t = 1e-3)] - convergence_thresh: f64, - /// maximum number of cores that the oarfish can use to obtain binomial probability - #[arg(short = 'j', long, default_value_t = 1)] - threads: usize, - /// location of short read quantification (if provided) - #[arg(short = 'q', long, help_heading = "EM")] - short_quant: Option, - /// number of bootstrap replicates to produce to assess quantification uncertainty - #[arg(long, default_value_t = 0)] - num_bootstraps: u32, - /// width of the bins used in the coverage model - #[arg(short, long, help_heading = "coverage model", default_value_t = 100)] - bin_width: u32, - /// use a KDE model of the observed fragment length distribution - #[arg(short, long, hide = true)] - use_kde: bool, + // TODO: better creation of the header + for i in 0..mmi.n_seq { + let _seq = unsafe { *(mmi.seq).offset(i as isize) }; + let c_str = unsafe { ffi::CStr::from_ptr(_seq.name) }; + let rust_str = c_str.to_str().unwrap().to_string(); + header = header.add_reference_sequence( + rust_str, + HeaderMap::::new(NonZeroUsize::try_from( + _seq.len as usize, + )?), + ); + } + + header = header.add_program( + "minimap2-rs", + HeaderMap::::default(), + ); + + let header = header.build(); + Ok((header, None, Some(aligner))) } -fn get_filter_opts(args: &Args) -> AlignmentFilters { +fn get_filter_opts(args: &Args) -> anyhow::Result { // set all of the filter options that the user // wants to apply. match args.filter_group { Some(FilterGroup::NoFilters) => { info!("disabling alignment filters."); - AlignmentFilters::builder() - .five_prime_clip(u32::MAX) - .three_prime_clip(i64::MAX) - .score_threshold(0_f32) - .min_aligned_fraction(0_f32) - .min_aligned_len(1_u32) + // override individual parameters if the user passed them in explicitly + let fpc = args + .five_prime_clip + .provided_or_u32("overriding 5' clip with user-provided value", u32::MAX); + let tpc = args + .three_prime_clip + .provided_or_i64("overriding 3' clip with user-provided value", i64::MAX); + let st = args + .score_threshold + .provided_or_f32("overriding score threshold with user-provided value", 0_f32); + let maf = args.min_aligned_fraction.provided_or_f32( + "overriding min aligned fraction with user-provided value", + 0_f32, + ); + let mal = args.min_aligned_len.provided_or_u32( + "overriding min aligned length with user-provided value", + 1_u32, + ); + + Ok(AlignmentFilters::builder() + .five_prime_clip(fpc) + .three_prime_clip(tpc) + .score_threshold(st) + .min_aligned_fraction(maf) + .min_aligned_len(mal) .which_strand(args.strand_filter) .model_coverage(args.model_coverage) - .build() + .build()) } Some(FilterGroup::NanocountFilters) => { info!("setting filters to nanocount defaults."); - AlignmentFilters::builder() - .five_prime_clip(u32::MAX) - .three_prime_clip(50_i64) - .score_threshold(0.95_f32) - .min_aligned_fraction(0.5_f32) - .min_aligned_len(50_u32) + // override individual parameters if the user passed them in explicitly + let fpc = args + .five_prime_clip + .provided_or_u32("overriding 5' clip with user-provided value", u32::MAX); + let tpc = args + .three_prime_clip + .provided_or_i64("overriding 3' clip with user-provided value", 50_i64); + let st = args.score_threshold.provided_or_f32( + "overriding score threshold with user-provided value", + 0.95_f32, + ); + let maf = args.min_aligned_fraction.provided_or_f32( + "overriding min aligned fraction with user-provided value", + 0.5_f32, + ); + let mal = args.min_aligned_len.provided_or_u32( + "overriding min aligned length with user-provided value", + 50_u32, + ); + + Ok(AlignmentFilters::builder() + .five_prime_clip(fpc) + .three_prime_clip(tpc) + .score_threshold(st) + .min_aligned_fraction(maf) + .min_aligned_len(mal) .which_strand(bio_types::strand::Strand::Forward) .model_coverage(args.model_coverage) - .build() + .build()) } None => { info!("setting user-provided filter parameters."); - AlignmentFilters::builder() - .five_prime_clip(args.five_prime_clip) - .three_prime_clip(args.three_prime_clip) - .score_threshold(args.score_threshold) - .min_aligned_fraction(args.min_aligned_fraction) - .min_aligned_len(args.min_aligned_len) + Ok(AlignmentFilters::builder() + .five_prime_clip(args.five_prime_clip.try_as_u32()?) + .three_prime_clip(args.three_prime_clip.try_as_i64()?) + .score_threshold(args.score_threshold.try_as_f32()?) + .min_aligned_fraction(args.min_aligned_fraction.try_as_f32()?) + .min_aligned_len(args.min_aligned_len.try_as_u32()?) .which_strand(args.strand_filter) .model_coverage(args.model_coverage) - .build() + .build()) } } } -/// Produce a [serde_json::Value] that encodes the relevant arguments and -/// parameters of the run that we wish to record to file. Ultimately, this -/// will be written to the corresponding `meta_info.json` file for this run. -fn get_json_info(args: &Args, emi: &EMInfo, seqcol_digest: &str) -> serde_json::Value { - let prob = if args.model_coverage { - "scaled_binomial" - } else { - "no_coverage" - }; - - json!({ - "prob_model" : prob, - "bin_width" : args.bin_width, - "filter_options" : &emi.eq_map.filter_opts, - "discard_table" : &emi.eq_map.discard_table, - "alignments": &args.alignments, - "output": &args.output, - "verbose": &args.verbose, - "quiet": &args.quiet, - "em_max_iter": &args.max_em_iter, - "em_convergence_thresh": &args.convergence_thresh, - "threads": &args.threads, - "filter_group": &args.filter_group, - "short_quant": &args.short_quant, - "num_bootstraps": &args.num_bootstraps, - "seqcol_digest": seqcol_digest - }) -} - fn main() -> anyhow::Result<()> { let env_filter = EnvFilter::builder() .with_default_directive(LevelFilter::INFO.into()) @@ -237,16 +247,24 @@ fn main() -> anyhow::Result<()> { reload_handle.modify(|filter| *filter = EnvFilter::new("TRACE"))?; } - let filter_opts = get_filter_opts(&args); + let filter_opts = get_filter_opts(&args)?; + + let (header, reader, aligner) = if args.alignments.is_none() { + get_aligner_from_args(&args)? + } else { + let alignments = args.alignments.clone().unwrap(); + let afile = File::open(&alignments)?; + + let worker_count = NonZeroUsize::new(1.max(args.threads.saturating_sub(1))) + .expect("decompression threads >= 1"); + let decoder = bgzf::MultithreadedReader::with_worker_count(worker_count, afile); + let mut reader = bam::io::Reader::from(decoder); + // parse the header, and ensure that the reads were mapped with minimap2 (as far as we + // can tell). + let header = alignment_parser::read_and_verify_header(&mut reader, &alignments)?; + (header, Some(reader), None) + }; - let afile = File::open(&args.alignments)?; - let worker_count = NonZeroUsize::new(1.max(args.threads.saturating_sub(1))) - .expect("decompression threads >= 1"); - let decoder = bgzf::MultithreadedReader::with_worker_count(worker_count, afile); - let mut reader = bam::io::Reader::from(decoder); - // parse the header, and ensure that the reads were mapped with minimap2 (as far as we - // can tell). - let header = alignment_parser::read_and_verify_header(&mut reader, &args.alignments)?; let seqcol_digest = { info!("calculating seqcol digest"); let sc = seqcol_rs::SeqCol::from_sam_header( @@ -286,112 +304,59 @@ fn main() -> anyhow::Result<()> { } info!( "parsed reference information for {} transcripts.", - txps.len() + txps.len().to_formatted_string(&Locale::en) ); - // now parse the actual alignments for the reads and store the results - // in our in-memory stor - let mut store = InMemoryAlignmentStore::new(filter_opts, &header); - alignment_parser::parse_alignments(&mut store, &header, &mut reader, &mut txps)?; - - // print discard table information in which the user might be interested. - info!("\ndiscard_table: \n{}\n", store.discard_table.to_table()); - - // no longer need the reader - drop(reader); - - // if we are using the KDE, create that here. - let kde_opt: Option = if args.use_kde { - Some(kde_utils::get_kde_model(&txps, &store)?) - } else { - None - }; - - if store.filter_opts.model_coverage { - //obtaining the Cumulative Distribution Function (CDF) for each transcript - binomial_continuous_prob(&mut txps, &args.bin_width, args.threads); - //Normalize the probabilities for the records of each read - normalize_read_probs(&mut store, &txps, &args.bin_width); - } - - info!( - "Total number of alignment records : {}", - store.total_len().to_formatted_string(&Locale::en) - ); - info!( - "number of aligned reads : {}", - store.num_aligned_reads().to_formatted_string(&Locale::en) - ); - info!( - "number of unique alignments : {}", - store.unique_alignments().to_formatted_string(&Locale::en) - ); - - // if we are seeding the quantification estimates with short read - // abundances, then read those in here. - let init_abundances = args.short_quant.as_ref().map(|sr_path| { - read_short_quant_vec(sr_path, &txps_name).unwrap_or_else(|e| panic!("{}", e)) - }); - - // wrap up all of the relevant information we need for estimation - // in an EMInfo struct and then call the EM algorithm. - let emi = EMInfo { - eq_map: &store, - txp_info: &txps, - max_iter: args.max_em_iter, - convergence_thresh: args.convergence_thresh, - init_abundances, - kde_model: kde_opt, - }; - - if args.use_kde { - /* - // run EM for model train iterations - let orig_iter = emi.max_iter; - emi.max_iter = 10; - let counts = em::em(&emi, args.threads); - // relearn the kde - let new_model = - kde_utils::refresh_kde_model(&txps, &store, &emi.kde_model.unwrap(), &counts); - info!("refreshed KDE model"); - emi.kde_model = Some(new_model?); - emi.max_iter = orig_iter; - */ - } - - let counts = if args.threads > 4 { - em::em_par(&emi, args.threads) + if args.single_cell { + // TODO: do this better (quiet the EM during single-cell quant) + reload_handle.modify(|filter| { + *filter = if args.quiet { + EnvFilter::new("WARN") + .add_directive("oarfish=warn".parse().unwrap()) + .add_directive("oarfish::single_cell=warn".parse().unwrap()) + } else if args.verbose { + EnvFilter::new("TRACE") + .add_directive("oarfish=info".parse().unwrap()) + .add_directive("oarfish::single_cell=trace".parse().unwrap()) + } else { + // be quiet about normal things in single-cell mode + // e.g. EM iterations, and only print out info for + // oarfish::single_cell events. + EnvFilter::new("INFO") + .add_directive("oarfish=warn".parse().unwrap()) + .add_directive("oarfish::single_cell=info".parse().unwrap()) + } + })?; + + single_cell::quantify_single_cell_from_collated_bam( + &header, + &filter_opts, + &mut reader.unwrap(), + &mut txps, + &args, + seqcol_digest, + )?; + } else if args.alignments.is_some() { + bulk::quantify_bulk_alignments_from_bam( + &header, + filter_opts, + &mut reader.unwrap(), + &mut txps, + &txps_name, + &args, + seqcol_digest, + )?; } else { - em::em(&emi, args.threads) - }; - - let aux_txp_counts = crate::util::aux_counts::get_aux_counts(&store, &txps)?; - - // prepare the JSON object we'll write - // to meta_info.json - let json_info = get_json_info(&args, &emi, &seqcol_digest); - - // write the output - write_output(&args.output, json_info, &header, &counts, &aux_txp_counts)?; - - // if the user requested bootstrap replicates, - // compute and write those out now. - if args.num_bootstraps > 0 { - let breps = em::bootstrap(&emi, args.num_bootstraps, args.threads); - - let mut new_arrays = vec![]; - let mut bs_fields = vec![]; - for (i, b) in breps.into_iter().enumerate() { - let bs_array = Float64Array::from_vec(b); - bs_fields.push(Field::new( - format!("bootstrap.{}", i), - bs_array.data_type().clone(), - false, - )); - new_arrays.push(bs_array.boxed()); - } - let chunk = Chunk::new(new_arrays); - write_infrep_file(&args.output, bs_fields, chunk)?; + bulk::quantify_bulk_alignments_raw_reads( + &header, + aligner.expect("need valid alinger to align reads"), + filter_opts, + &args.reads.clone().expect("expected read file(s)"), + &mut txps, + &txps_name, + &args, + seqcol_digest, + )?; } Ok(()) diff --git a/src/prog_opts.rs b/src/prog_opts.rs new file mode 100644 index 0000000..b515aa3 --- /dev/null +++ b/src/prog_opts.rs @@ -0,0 +1,331 @@ +use clap::{Parser, builder::ArgPredicate}; +use serde::Serialize; +use tracing::info; +use std::path::PathBuf; +use std::str::FromStr; +use std::fmt; + +/// These represent different "meta-options", specific settings +/// for all of the different filters that should be applied in +/// different cases. +#[derive(Clone, Debug, clap::ValueEnum, Serialize)] +pub enum FilterGroup { + NoFilters, + NanocountFilters, +} + +fn parse_strand(arg: &str) -> anyhow::Result { + match arg { + "+" | "fw" | "FW" | "f" | "F" => Ok(bio_types::strand::Strand::Forward), + "-" | "rc" | "RC" | "r" | "R" => Ok(bio_types::strand::Strand::Reverse), + "." | "both" | "either" => Ok(bio_types::strand::Strand::Unknown), + _ => anyhow::bail!("Cannot parse {} as a valid strand type", arg), + } +} + +#[derive(Debug, Clone, clap::ValueEnum, Serialize)] +pub enum SequencingTech { + OntCDNA, + OntDRNA, + PacBio, + PacBioHifi, +} + +impl FromStr for SequencingTech { + type Err = String; + + fn from_str(s: &str) -> Result { + match s.to_lowercase().as_str() { + "ont" => Ok(SequencingTech::OntCDNA), + "ont-cdna" => Ok(SequencingTech::OntCDNA), + "ont-drna" => Ok(SequencingTech::OntDRNA), + "pb" => Ok(SequencingTech::PacBio), + "pacbio" => Ok(SequencingTech::PacBio), + "pb-hifi" => Ok(SequencingTech::PacBioHifi), + "pacbio-hifi" => Ok(SequencingTech::PacBioHifi), + x => Err(format!("Unknown protocol type {:}", x)), + } + } +} + +/// This tells us the value of the filter argument and +/// the type remembers if it was the default or if the +/// user provided it explicltiy. +/// TODO: see if there is some built-in clap functionality +/// to avoid this song and dance. +#[derive(Debug, Clone, PartialEq, Serialize)] +pub enum FilterArg{ + DefaultI64(i64), + ProvidedI64(i64), + DefaultU32(u32), + ProvidedU32(u32), + DefaultF32(f32), + ProvidedF32(f32), +} + +/// because we have to (in the derive approach at least) round trip +/// the default values through strings, we need some way to designate +/// a default value from a provided one. Default values will all start +/// with '*' +const DEFAULT_FILTER_PREFIX: &str = "*"; + +/// How to convert a FilterArg to a string +impl fmt::Display for FilterArg { + // This trait requires `fmt` with this exact signature. + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + match self { + FilterArg::DefaultI64(x) => write!(f, "{}{}", DEFAULT_FILTER_PREFIX, x), + FilterArg::DefaultU32(x) => write!(f, "{}{}", DEFAULT_FILTER_PREFIX, x), + FilterArg::DefaultF32(x) => write!(f, "{}{}", DEFAULT_FILTER_PREFIX, x), + FilterArg::ProvidedI64(x) => write!(f, "{}", x), + FilterArg::ProvidedU32(x) => write!(f, "{}", x), + FilterArg::ProvidedF32(x) => write!(f, "{}", x), + } + } +} + + +/// Necessary functions on [FilterArg] +impl FilterArg { + /// If it is an i64 type, get the i64 + pub fn try_as_i64(&self) -> anyhow::Result { + match self { + FilterArg::DefaultI64(x) => Ok(*x), + FilterArg::ProvidedI64(x) => Ok(*x), + _ => anyhow::bail!("Could not provide FilterArg variant as an i64") + } + } + + /// If it is an u32 type, get the u32 + pub fn try_as_u32(&self) -> anyhow::Result { + match self { + FilterArg::DefaultU32(x) => Ok(*x), + FilterArg::ProvidedU32(x) => Ok(*x), + _ => anyhow::bail!("Could not provide FilterArg variant as a u32") + } + } + + /// If it is an f32 type, get the f32 + pub fn try_as_f32(&self) -> anyhow::Result { + match self { + FilterArg::DefaultF32(x) => Ok(*x), + FilterArg::ProvidedF32(x) => Ok(*x), + _ => anyhow::bail!("Could not provide FilterArg variant as an f32") + } + } + + /// If the value is user provided, return the value, otherwise + /// return `other` and print the message provided in `msg` to the + /// logger. + pub fn provided_or_u32(&self, msg: &str, other: u32) -> u32 { + match self { + FilterArg::ProvidedU32(x) => { + info!("{} {}", msg, x); + *x + } + _ => other + } + } + + /// If the value is user provided, return the value, otherwise + /// return `other` and print the message provided in `msg` to the + /// logger. + pub fn provided_or_i64(&self, msg: &str, other: i64) -> i64 { + match self { + FilterArg::ProvidedI64(x) => { + info!("{} {}", msg, x); + *x + } + _ => other + } + } + + /// If the value is user provided, return the value, otherwise + /// return `other` and print the message provided in `msg` to the + /// logger. + pub fn provided_or_f32(&self, msg: &str, other: f32) -> f32 { + match self { + FilterArg::ProvidedF32(x) => { + info!("{} {}", msg, x); + *x + } + _ => other + } + } +} + +/// Parse a string as a [FilterArg] with an i64 inner type +fn parse_filter_i64(arg: &str) -> anyhow::Result { + if let Some(val) = arg.strip_prefix(DEFAULT_FILTER_PREFIX) { + let v = val.parse::()?; + Ok(FilterArg::DefaultI64(v)) + } else { + let v = arg.parse::()?; + Ok(FilterArg::ProvidedI64(v)) + } +} + +/// Parse a string as a [FilterArg] with a u32 inner type +fn parse_filter_u32(arg: &str) -> anyhow::Result { + if let Some(val) = arg.strip_prefix(DEFAULT_FILTER_PREFIX) { + let v = val.parse::()?; + Ok(FilterArg::DefaultU32(v)) + } else { + let v = arg.parse::()?; + Ok(FilterArg::ProvidedU32(v)) + } +} + +/// Parse a string as a [FilterArg] with an f32 inner type +fn parse_filter_f32(arg: &str) -> anyhow::Result { + if let Some(val) = arg.strip_prefix(DEFAULT_FILTER_PREFIX) { + let v = val.parse::()?; + Ok(FilterArg::DefaultF32(v)) + } else { + let v = arg.parse::()?; + Ok(FilterArg::ProvidedF32(v)) + } +} + +/// accurate transcript quantification from long-read RNA-seq data +#[derive(Parser, Debug, Serialize)] +#[clap(author, version, about, long_about = None)] +#[command(group( + clap::ArgGroup::new("input") + .required(true) + .args(["alignments", "reads"]) +))] +pub struct Args { + /// be quiet (i.e. don't output log messages that aren't at least warnings) + #[arg(long, conflicts_with = "verbose")] + pub quiet: bool, + + /// be verbose (i.e. output all non-developer logging messages) + #[arg(long)] + pub verbose: bool, + + /// path to the file containing the input alignments + #[arg( + short, + long, + help_heading = "alignment mode" + )] + pub alignments: Option, + + /// path to the file containing the input reads + #[arg( + long, + help_heading = "raw read mode", + value_delimiter = ',', + requires_ifs([ + (ArgPredicate::IsPresent, "reference"), + (ArgPredicate::IsPresent, "seq_tech") + ]) + )] + pub reads: Option>, + + /// path to the file containing the reference transcriptome (or existing index) against which + /// to map + #[arg( + long, + conflicts_with = "alignments", + help_heading = "raw read mode", + )] + pub reference: Option, + + /// path where minimap2 index will be written (if provided) + #[arg( + long, + conflicts_with = "alignments", + help_heading = "raw read mode", + )] + pub index_out: Option, + + /// sequencing technology in which to expect reads if using mapping based mode + #[arg( + long, + help_heading = "raw read mode", + required_unless_present = "alignments", + value_parser = clap::value_parser!(SequencingTech) + )] + pub seq_tech: Option, + + /// maximum number of secondary mappings to consider when mapping reads to the transcriptome + #[arg(long, default_value_t = 100, requires = "reads", help_heading = "raw read mode")] + pub best_n: usize, + + /// location where output quantification file should be written + #[arg(short, long, required = true)] + pub output: PathBuf, + + #[arg(long, help_heading = "filters", value_enum)] + pub filter_group: Option, + + /// maximum allowable distance of the right-most end of an alignment from the 3' transcript end + #[arg(short, long, help_heading="filters", default_value_t = FilterArg::DefaultI64(u32::MAX as i64), value_parser = parse_filter_i64)] + pub three_prime_clip: FilterArg, + + /// maximum allowable distance of the left-most end of an alignment from the 5' transcript end + #[arg(short, long, help_heading="filters", default_value_t = FilterArg::DefaultU32(u32::MAX), value_parser = parse_filter_u32)] + pub five_prime_clip: FilterArg, + + /// fraction of the best possible alignment score that a secondary alignment must have for + /// consideration + #[arg(short, long, help_heading = "filters", default_value_t = FilterArg::DefaultF32(0.95), value_parser = parse_filter_f32)] + pub score_threshold: FilterArg, + + /// fraction of a query that must be mapped within an alignemnt to consider the alignemnt + /// valid + #[arg(short, long, help_heading = "filters", default_value_t = FilterArg::DefaultF32(0.5), value_parser = parse_filter_f32)] + pub min_aligned_fraction: FilterArg, + + /// minimum number of nucleotides in the aligned portion of a read + #[arg(short = 'l', long, help_heading = "filters", default_value_t = FilterArg::DefaultU32(50), value_parser = parse_filter_u32)] + pub min_aligned_len: FilterArg, + + /// only alignments to this strand will be allowed; options are (fw /+, rc/-, or both/.) + #[arg( + short = 'd', + long, + help_heading = "filters", + default_value_t = bio_types::strand::Strand::Unknown, + value_parser = parse_strand + )] + pub strand_filter: bio_types::strand::Strand, + + /// input is assumed to be a single-cell BAM and to have the `CB:z` tag for all read records + #[arg(long, conflicts_with = "reads")] + pub single_cell: bool, + + /// apply the coverage model + #[arg(long, help_heading = "coverage model", value_parser)] + pub model_coverage: bool, + + /// maximum number of iterations for which to run the EM algorithm + #[arg(long, help_heading = "EM", default_value_t = 1000)] + pub max_em_iter: u32, + + /// maximum number of iterations for which to run the EM algorithm + #[arg(long, help_heading = "EM", default_value_t = 1e-3)] + pub convergence_thresh: f64, + + /// maximum number of cores that the oarfish can use to obtain binomial probability + #[arg(short = 'j', long, default_value_t = 1)] + pub threads: usize, + + /// location of short read quantification (if provided) + #[arg(short = 'q', long, help_heading = "EM")] + pub short_quant: Option, + + /// number of bootstrap replicates to produce to assess quantification uncertainty + #[arg(long, default_value_t = 0)] + pub num_bootstraps: u32, + + /// width of the bins used in the coverage model + #[arg(short, long, help_heading = "coverage model", default_value_t = 100)] + pub bin_width: u32, + + /// use a KDE model of the observed fragment length distribution + #[arg(short, long, hide = true)] + pub use_kde: bool, +} diff --git a/src/single_cell.rs b/src/single_cell.rs new file mode 100644 index 0000000..bf70aba --- /dev/null +++ b/src/single_cell.rs @@ -0,0 +1,260 @@ +use crate::alignment_parser; +use crate::em; +use crate::prog_opts::Args; +use crate::util::oarfish_types::{ + AlignmentFilters, EMInfo, InMemoryAlignmentStore, TranscriptInfo, +}; +use crate::util::write_function; +use crossbeam::queue::ArrayQueue; +use noodles_bam as bam; +use noodles_sam::alignment::RecordBuf; +use path_tools::WithAdditionalExtension; +use serde_json::json; +use std::fs::{create_dir_all, File}; +use std::io::{BufRead, Write}; +use std::path::Path; +use std::sync::{Arc, Mutex}; +use tracing::{error, info}; + +struct QuantOutputInfo { + barcode_file: std::io::BufWriter, + row_ids: Vec, + col_ids: Vec, + vals: Vec, + row_index: usize, +} + +/// Produce a [serde_json::Value] that encodes the relevant arguments and +/// parameters of the run that we wish to record to file. Ultimately, this +/// will be written to the corresponding `meta_info.json` file for this run. +fn get_single_cell_json_info(args: &Args, seqcol_digest: &str) -> serde_json::Value { + let prob = if args.model_coverage { + "scaled_binomial" + } else { + "no_coverage" + }; + + json!({ + "prob_model" : prob, + "bin_width" : args.bin_width, + "alignments": &args.alignments, + "output": &args.output, + "verbose": &args.verbose, + "single_cell": &args.single_cell, + "quiet": &args.quiet, + "em_max_iter": &args.max_em_iter, + "em_convergence_thresh": &args.convergence_thresh, + "threads": &args.threads, + "filter_group": &args.filter_group, + "short_quant": &args.short_quant, + "seqcol_digest": seqcol_digest + }) +} + +pub fn quantify_single_cell_from_collated_bam( + header: &noodles_sam::Header, + filter_opts: &AlignmentFilters, + reader: &mut bam::io::Reader, + txps: &mut [TranscriptInfo], + args: &Args, + seqcol_digest: String, +) -> anyhow::Result<()> { + // if there is a parent directory + if let Some(p) = args.output.parent() { + // unless this was a relative path with one component, + // which we should treat as the file prefix, then grab + // the non-empty parent and create it. + if p != Path::new("") { + create_dir_all(p)?; + } + } + + let nthreads = args.threads; + std::thread::scope(|s| { + let bc_path = args.output.with_additional_extension(".barcodes.txt"); + let bc_file = File::create(bc_path)?; + let bc_writer = Arc::new(Mutex::new(QuantOutputInfo { + barcode_file: std::io::BufWriter::new(bc_file), + row_ids: Vec::new(), + col_ids: Vec::new(), + vals: Vec::new(), + row_index: 0usize, + })); + + // the element consists of the vector of records corresponding + // to this cell, a (read-only) copy of TranscriptInfo which will + // be copied and modified by the thread, and the barcode + // (represented as a Vec) for the cell. + type QueueElement<'a> = (Vec, &'a [TranscriptInfo], Vec); + + let q: Arc> = Arc::new(ArrayQueue::new(4 * nthreads)); + let done_parsing = Arc::new(std::sync::atomic::AtomicBool::new(false)); + let mut thread_handles: Vec>> = + Vec::with_capacity(nthreads); + + for _worker_id in 0..nthreads { + let in_q = q.clone(); + let done_parsing = done_parsing.clone(); + let num_txps = txps.len(); + let bc_out = bc_writer.clone(); + let bin_width = args.bin_width; + let filter_opts = filter_opts.clone(); + + let handle = s.spawn(move || { + let mut col_ids = Vec::with_capacity(num_txps); + let mut row_ids = Vec::with_capacity(num_txps); + let mut vals = Vec::with_capacity(num_txps); + let mut num_cells = 0_usize; + let mut records_for_read = Vec::::with_capacity(16); + + // while the queue might still be being filled + while !done_parsing.load(std::sync::atomic::Ordering::SeqCst) { + // get the next cell + while let Some(elem) = in_q.pop() { + let mut recs = elem.0; + // new copy of txp info for this barcode + let mut txps = Vec::with_capacity(num_txps); + txps.extend_from_slice(elem.1); + // the barcode of this cell + let barcode = elem.2; + // where we will store the relevant alignment records + let mut store = InMemoryAlignmentStore::new(filter_opts.clone(), header); + + // sort by read name and then parse the records for this cell + alignment_parser::sort_and_parse_barcode_records( + &mut recs, + &mut store, + &mut txps, + &mut records_for_read, + )?; + + if store.filter_opts.model_coverage { + //obtaining the Cumulative Distribution Function (CDF) for each transcript + crate::binomial_continuous_prob(&mut txps, &bin_width, 1); + //Normalize the probabilities for the records of each read + crate::normalize_read_probs(&mut store, &txps, &bin_width); + } + + // wrap up all of the relevant information we need for estimation + // in an EMInfo struct and then call the EM algorithm. + let emi = EMInfo { + eq_map: &store, + txp_info: &txps, + max_iter: args.max_em_iter, + convergence_thresh: args.convergence_thresh, + init_abundances: None, + kde_model: None, + }; + // run the EM for this cell + let counts = em::em(&emi, 1); + // clear out the vectors where we will store + // the count information for this cell + col_ids.clear(); + vals.clear(); + for (col_idx, v) in counts.iter().enumerate() { + if *v > 0.0 { + col_ids.push(col_idx as u32); + vals.push((*v) as f32); + } + } + // fill the row ids for this cell; fist + // we size the vector to the correct length + // and fill it with 0s and below we + // fill with the appropriate number (i.e. the + // cell/barcode ID). + row_ids.resize(col_ids.len(), 0_u32); + num_cells += 1; + + let row_index: usize; + { + // grab a lock and fill out the count info for + // this cell. + let writer_deref = bc_out.lock(); + let writer = &mut *writer_deref.unwrap(); + writeln!(&mut writer.barcode_file, "{}", unsafe { + std::str::from_utf8_unchecked(&barcode) + })?; + + // get the row index and then increment it + row_index = writer.row_index; + writer.row_index += 1; + row_ids.fill(row_index as u32); + + writer.col_ids.extend_from_slice(&col_ids); + writer.row_ids.extend_from_slice(&row_ids); + writer.vals.extend_from_slice(&vals); + } + } + } + Ok(num_cells) + }); + thread_handles.push(handle); + } + + // get the data for the next cell + let mut peekable_bam_iter = reader.record_bufs(header).peekable(); + const CB_TAG: [u8; 2] = [b'C', b'B']; + let mut num_cells = 0_usize; + // parser thread + while let Some(next_res) = peekable_bam_iter.peek() { + let rec = next_res.as_ref().unwrap(); + let barcode = match rec.data().get(&CB_TAG) { + None => anyhow::bail!("could not get CB tag value"), + Some(v) => match v { + noodles_sam::alignment::record_buf::data::field::Value::String(x) => { + x.to_ascii_uppercase() + } + _ => anyhow::bail!("CB tag value had unexpected type!"), + }, + }; + + let records_for_barcode = + alignment_parser::parse_alignments_for_barcode(&mut peekable_bam_iter, &barcode)?; + + num_cells += 1; + if num_cells > 1 && num_cells % 100 == 0 { + info!("Processed {} cells.", num_cells); + } + + let mut astore = (records_for_barcode, &(*txps), barcode); + + // push the store on to the work queue + while let Err(store) = q.push(astore) { + astore = store; + while q.is_full() {} + } + } + + done_parsing.store(true, std::sync::atomic::Ordering::SeqCst); + + let mut total_cells = 0_usize; + for h in thread_handles { + match h.join() { + Ok(Ok(nc)) => { + total_cells += nc; + } + Ok(Err(e)) => { + error!("error result from thread {:?}", e); + } + Err(_e) => { + error!("thread panicked"); + } + } + } + + let trimat = { + let writer_deref = bc_writer.lock(); + let writer = &mut *writer_deref.unwrap(); + let num_rows = total_cells; + sprs::TriMatI::::from_triplets( + (num_rows, txps.len()), + writer.row_ids.clone(), + writer.col_ids.clone(), + writer.vals.clone(), + ) + }; + let info = get_single_cell_json_info(args, &seqcol_digest); + write_function::write_single_cell_output(&args.output, info, header, &trimat)?; + Ok(()) + }) +} diff --git a/src/util/binomial_probability.rs b/src/util/binomial_probability.rs index 31a0f26..0bf1737 100644 --- a/src/util/binomial_probability.rs +++ b/src/util/binomial_probability.rs @@ -177,7 +177,7 @@ pub fn binomial_probability( normalized_prob } -pub fn binomial_continuous_prob(txps: &mut Vec, bins: &u32, threads: usize) { +pub fn binomial_continuous_prob(txps: &mut [TranscriptInfo], bins: &u32, threads: usize) { use tracing::info; use tracing::info_span; diff --git a/src/util/oarfish_types.rs b/src/util/oarfish_types.rs index 5fa9360..efbc2ca 100644 --- a/src/util/oarfish_types.rs +++ b/src/util/oarfish_types.rs @@ -11,12 +11,203 @@ use serde::Serialize; use typed_builder::TypedBuilder; use bio_types::strand::Strand; +use minimap2_temp as minimap2; use noodles_sam as sam; use sam::{alignment::record::data::field::tag::Tag as AlnTag, Header}; #[allow(unused_imports)] use tracing::{error, info, warn}; +pub trait AlnRecordLike { + fn opt_sequence_len(&self) -> Option; + fn is_reverse_complemented(&self) -> bool; + fn is_unmapped(&self) -> bool; + fn ref_id(&self, header: &Header) -> anyhow::Result; + fn aln_span(&self) -> Option; + fn aln_score(&self) -> Option; + fn aln_start(&self) -> u32; + fn aln_end(&self) -> u32; + fn is_supp(&self) -> bool; +} + +#[derive(Debug, Clone, PartialEq)] +pub enum CigarOp { + Match, + Insertion, + Deletion, + Skip, + SoftClip, + HardClip, + Pad, + SequenceMatch, + SequenceMismatch, +} + +impl From for CigarOp { + fn from(e: u8) -> Self { + match e { + 0 => CigarOp::Match, + 1 => CigarOp::Insertion, + 2 => CigarOp::Deletion, + 3 => CigarOp::Skip, + 4 => CigarOp::SoftClip, + 5 => CigarOp::HardClip, + 6 => CigarOp::Pad, + 7 => CigarOp::SequenceMatch, + 8 => CigarOp::SequenceMismatch, + x => { + error!("invalid cigar code {}", x); + CigarOp::Skip + } + } + } +} + +/// from noodles: https://docs.rs/noodles-sam/latest/src/noodles_sam/alignment/record/cigar/op/kind.rs.html +impl CigarOp { + #[allow(dead_code)] + pub fn consumes_read(&self) -> bool { + matches!( + self, + Self::Match + | Self::Insertion + | Self::SoftClip + | Self::SequenceMatch + | Self::SequenceMismatch + ) + } + + pub fn consumes_reference(&self) -> bool { + matches!( + self, + Self::Match + | Self::Deletion + | Self::Skip + | Self::SequenceMatch + | Self::SequenceMismatch + ) + } +} + +impl AlnRecordLike for minimap2::Mapping { + fn opt_sequence_len(&self) -> Option { + self.query_len.map(|x| x.get() as usize) + } + + fn is_reverse_complemented(&self) -> bool { + self.strand == minimap2::Strand::Reverse + } + + fn is_unmapped(&self) -> bool { + self.target_name.is_none() + } + + fn ref_id(&self, _header: &Header) -> anyhow::Result { + if let Some(ref tgt_name) = self.target_name { + let tgt_str = tgt_name.as_bytes(); + if let Some(id) = _header.reference_sequences().get_index_of(tgt_str) { + return Ok(id); + } + anyhow::bail!("Could not get ref_id of target {}", tgt_name); + } + anyhow::bail!("Could not get ref_id of mapping without target name") + } + + fn aln_span(&self) -> Option { + if let Some(ref aln) = self.alignment { + if let Some(ref cigar) = aln.cigar { + let mut span = 0_usize; + for (len, op) in cigar.iter() { + let co: CigarOp = (*op).into(); + if co.consumes_reference() { + span += *len as usize; + } + } + return Some(span); + } + error!("Had an alignment but no CIGAR!"); + return None; + } + None + } + + fn aln_score(&self) -> Option { + if let Some(ref aln) = self.alignment { + aln.alignment_score.map(|x| x as i64) + } else { + None + } + } + + fn aln_start(&self) -> u32 { + self.target_start.saturating_add(1) as u32 + } + fn aln_end(&self) -> u32 { + self.target_end.saturating_add(1) as u32 + } + fn is_supp(&self) -> bool { + self.is_supplementary + } +} + +pub trait NoodlesAlignmentLike {} +impl NoodlesAlignmentLike for noodles_sam::alignment::record_buf::RecordBuf {} + +/// implement the AlnRecordLike trait for the underlying noodles Record type +impl AlnRecordLike for T { + fn opt_sequence_len(&self) -> Option { + Some(self.sequence().len()) + } + + fn is_reverse_complemented(&self) -> bool { + self.flags().expect("valid flags").is_reverse_complemented() + } + + fn is_unmapped(&self) -> bool { + self.flags() + .expect("alignment record should have flags") + .is_unmapped() + } + + fn ref_id(&self, header: &Header) -> anyhow::Result { + self.reference_sequence_id(header) + .unwrap() + .map_err(anyhow::Error::from) + } + + fn aln_span(&self) -> Option { + self.alignment_span().expect("valid span") + } + + fn aln_score(&self) -> Option { + self.data() + .get(&AlnTag::ALIGNMENT_SCORE) + .unwrap() + .expect("could not get value") + .as_int() + } + + fn aln_start(&self) -> u32 { + self.alignment_start() + .unwrap() + .expect("valid aln start") + .get() as u32 + } + + fn aln_end(&self) -> u32 { + self.alignment_end() + .unwrap() + .expect("valid aln start") + .get() as u32 + } + + fn is_supp(&self) -> bool { + self.flags() + .expect("alignment record should have flags") + .is_supplementary() + } +} + #[derive(Clone, Debug, PartialEq)] pub struct AlnInfo { pub ref_id: u32, @@ -34,23 +225,13 @@ impl AlnInfo { } impl AlnInfo { - fn from_noodles_record( - aln: &T, - aln_header: &Header, - ) -> Self { + fn from_aln_rec_like(aln: &T, aln_header: &Header) -> Self { Self { - ref_id: aln - .reference_sequence_id(aln_header) - .unwrap() - .expect("valid reference id") as u32, - start: aln - .alignment_start() - .unwrap() - .expect("valid aln start") - .get() as u32, - end: aln.alignment_end().unwrap().expect("valid aln end").get() as u32, + ref_id: aln.ref_id(aln_header).expect("valid ref_id") as u32, + start: aln.aln_start(), + end: aln.aln_end(), prob: 0.0_f64, - strand: if aln.flags().expect("valid flags").is_reverse_complemented() { + strand: if aln.is_reverse_complemented() { Strand::Reverse } else { Strand::Forward @@ -107,7 +288,7 @@ pub struct EMInfo<'eqm, 'tinfo, 'h> { // perform the EM pub eq_map: &'eqm InMemoryAlignmentStore<'h>, // relevant information about each target transcript - pub txp_info: &'tinfo Vec, + pub txp_info: &'tinfo [TranscriptInfo], // maximum number of iterations the EM will run // before returning an estimate. pub max_iter: u32, @@ -124,7 +305,7 @@ pub struct EMInfo<'eqm, 'tinfo, 'h> { pub kde_model: Option, } -#[derive(Debug, PartialEq)] +#[derive(Clone, Debug, PartialEq)] pub struct TranscriptInfo { pub len: NonZeroUsize, pub total_weight: f64, @@ -254,6 +435,10 @@ impl<'h> InMemoryAlignmentStore<'h> { pub fn len(&self) -> usize { self.boundaries.len().saturating_sub(1) } + + pub fn aggregate_discard_table(&mut self, table: &DiscardTable) { + self.discard_table.aggregate(table); + } } pub struct InMemoryAlignmentStoreSamplingWithReplacementIter<'a, 'h, 'b> { @@ -359,7 +544,8 @@ impl<'h> InMemoryAlignmentStore<'h> { } } - pub fn add_group( + #[inline(always)] + pub fn add_group( &mut self, txps: &mut [TranscriptInfo], ag: &mut Vec, @@ -367,19 +553,26 @@ impl<'h> InMemoryAlignmentStore<'h> { let (alns, as_probs) = self.filter_opts .filter(&mut self.discard_table, self.aln_header, txps, ag); + self.add_filtered_group(&alns, &as_probs, txps); + } + + #[inline(always)] + pub fn add_filtered_group( + &mut self, + alns: &[AlnInfo], + as_probs: &[f32], + txps: &mut [TranscriptInfo], + ) { if !alns.is_empty() { - self.alignments.extend_from_slice(&alns); - self.as_probabilities.extend_from_slice(&as_probs); + for a in alns.iter() { + let tid = a.ref_id as usize; + txps[tid].add_interval(a.start, a.end, 1.0_f64); + } + self.alignments.extend_from_slice(alns); + self.as_probabilities.extend_from_slice(as_probs); self.coverage_probabilities .extend(vec![0.0_f64; alns.len()]); self.boundaries.push(self.alignments.len()); - // @Susan-Zare : this is making things unreasonably slow. Perhaps - // we should avoid pushing actual ranges, and just compute the - // contribution of each range to the coverage online. - //for a in alns { - // txps[a.ref_id as usize].ranges.push(a.start..a.end); - //} - // } } @@ -390,7 +583,7 @@ impl<'h> InMemoryAlignmentStore<'h> { #[inline(always)] pub fn num_aligned_reads(&self) -> usize { - self.len().saturating_sub(1) + self.len() } #[inline(always)] @@ -406,7 +599,7 @@ impl<'h> InMemoryAlignmentStore<'h> { /// The parameters controling the filters that will /// be applied to alignments -#[derive(TypedBuilder, Debug, Serialize)] +#[derive(TypedBuilder, Clone, Debug, Serialize)] pub struct AlignmentFilters { /// How far an alignment can start from the /// 5' end of the transcript and still be @@ -454,7 +647,7 @@ pub struct DiscardTable { } impl DiscardTable { - fn new() -> Self { + pub fn new() -> Self { DiscardTable { discard_5p: 0, discard_3p: 0, @@ -466,6 +659,17 @@ impl DiscardTable { valid_best_aln: 0, } } + + pub fn aggregate(&mut self, other: &Self) { + self.discard_5p += other.discard_5p; + self.discard_3p += other.discard_3p; + self.discard_score += other.discard_score; + self.discard_aln_frac += other.discard_aln_frac; + self.discard_aln_len += other.discard_aln_len; + self.discard_ori += other.discard_ori; + self.discard_supp += other.discard_supp; + self.valid_best_aln += other.valid_best_aln; + } } impl DiscardTable { @@ -552,11 +756,11 @@ impl AlignmentFilters { /// /// This function returns a vector of the `AlnInfo` structs for alignments /// that pass the filter, and the associated probabilities for each. - fn filter( + pub fn filter( &mut self, discard_table: &mut DiscardTable, aln_header: &Header, - txps: &mut [TranscriptInfo], + txps: &[TranscriptInfo], ag: &mut Vec, ) -> (Vec, Vec) { // track the best score of any alignment we've seen @@ -575,46 +779,23 @@ impl AlignmentFilters { // so, here we explicitly look for a non-zero sequence. let seq_len = ag .iter() - .find_map(|x| { - let l = x.sequence().len(); - if l > 0 { - Some(l as u32) - } else { - None - } - }) + .find_map(|x| x.opt_sequence_len().map(|y| y as u32)) .unwrap_or(0_u32); // apply the filter criteria to determine what alignments to retain ag.retain(|x| { // we ony want to retain mapped reads - if !x - .flags() - .expect("alignment record should have flags") - .is_unmapped() - { - let tid = x - .reference_sequence_id(aln_header) - .unwrap() - .expect("valid tid"); + if !x.is_unmapped() { + let tid = x.ref_id(aln_header).expect("valid ref id"); // get the alignment span - let aln_span = x.alignment_span().expect("valid span").unwrap() as u32; + let aln_span = x.aln_span().unwrap() as u32; // get the alignment score, as computed by the aligner - let score = x - .data() - .get(&AlnTag::ALIGNMENT_SCORE) - .unwrap() - .expect("could not get value") - .as_int() - .unwrap_or(i32::MIN as i64) as i32; + let score = x.aln_score().unwrap_or(i32::MIN as i64) as i32; // the alignment is to the - strand - let is_rc = x - .flags() - .expect("alignment record should have flags") - .is_reverse_complemented(); + let is_rc = x.is_reverse_complemented(); // if we are keeping only forward strand alignments // filter this alignment if it is rc @@ -639,10 +820,7 @@ impl AlignmentFilters { // the alignment is supplementary // *NOTE*: this removes "supplementary" alignments, *not* // "secondary" alignments. - let is_supp = x - .flags() - .expect("alignment record should have flags") - .is_supplementary(); + let is_supp = x.is_supp(); if is_supp { discard_table.discard_supp += 1; return false; @@ -656,24 +834,15 @@ impl AlignmentFilters { } // not too far from the 3' end - let filt_3p = (x - .alignment_end() - .unwrap() - .expect("alignment record should have end position") - .get() as i64) - <= (txps[tid].len.get() as i64 - self.three_prime_clip); + let filt_3p = + (x.aln_end() as i64) <= (txps[tid].len.get() as i64 - self.three_prime_clip); if filt_3p { discard_table.discard_3p += 1; return false; } // not too far from the 5' end - let filt_5p = (x - .alignment_start() - .unwrap() - .expect("alignment record should have a start position") - .get() as u32) - >= self.five_prime_clip; + let filt_5p = x.aln_start() >= self.five_prime_clip; if filt_5p { discard_table.discard_5p += 1; return false; @@ -720,19 +889,12 @@ impl AlignmentFilters { // get a vector of all of the scores let mut scores: Vec = ag .iter_mut() - .map(|a| { - a.data() - .get(&AlnTag::ALIGNMENT_SCORE) - .unwrap() - .expect("could not get value") - .as_int() - .unwrap_or(0) as i32 - }) + .map(|a| a.aln_score().unwrap_or(0) as i32) .collect(); let _min_allowed_score = self.score_threshold * mscore; - for (i, score) in scores.iter_mut().enumerate() { + for score in scores.iter_mut() { const SCORE_PROB_DENOM: f32 = 5.0; let fscore = *score as f32; let score_ok = (fscore * inv_max_score) >= self.score_threshold; //>= thresh_score; @@ -740,28 +902,6 @@ impl AlignmentFilters { //let f = ((fscore - mscore) / (mscore - min_allowed_score)) * SCORE_PROB_DENOM; let f = (fscore - mscore) / SCORE_PROB_DENOM; probabilities.push(f.exp()); - - let tid = ag[i] - .reference_sequence_id(aln_header) - .unwrap() - .expect("valid transcript id"); - - // since we are retaining this alignment, then - // add it to the coverage of the the corresponding - // transcript. - txps[tid].add_interval( - ag[i] - .alignment_start() - .unwrap() - .expect("valid alignment start") - .get() as u32, - ag[i] - .alignment_end() - .unwrap() - .expect("valid alignment end") - .get() as u32, - 1.0_f64, - ); } else { *score = i32::MIN; discard_table.discard_score += 1; @@ -774,7 +914,7 @@ impl AlignmentFilters { ( ag.iter() - .map(|x| AlnInfo::from_noodles_record(x, aln_header)) + .map(|x| AlnInfo::from_aln_rec_like(x, aln_header)) .collect(), probabilities, ) diff --git a/src/util/write_function.rs b/src/util/write_function.rs index b66d7ac..451f3b3 100644 --- a/src/util/write_function.rs +++ b/src/util/write_function.rs @@ -17,6 +17,53 @@ use std::{ io::{self, BufWriter, Write}, }; +pub fn write_single_cell_output( + output: &PathBuf, + info: serde_json::Value, + header: &noodles_sam::header::Header, + counts: &sprs::TriMatI, +) -> io::Result<()> { + // if there is a parent directory + if let Some(p) = output.parent() { + // unless this was a relative path with one component, + // which we should treat as the file prefix, then grab + // the non-empty parent and create it. + if p != Path::new("") { + create_dir_all(p)?; + } + } + + { + let info_path = output.with_additional_extension(".meta_info.json"); + let write = OpenOptions::new() + .write(true) + .create(true) + .truncate(true) + .open(info_path) + .expect("Couldn't create output file"); + + serde_json::ser::to_writer_pretty(write, &info)?; + } + + let out_path = output.with_additional_extension(".count.mtx"); + sprs::io::write_matrix_market(out_path, counts)?; + + let out_path = output.with_additional_extension(".features.txt"); + File::create(&out_path)?; + let write = OpenOptions::new() + .write(true) + .create(true) + .truncate(true) + .open(out_path) + .expect("Couldn't create output file"); + let mut writer = BufWriter::new(write); + + for (rseq, _rmap) in header.reference_sequences().iter() { + writeln!(writer, "{}", rseq).expect("Couldn't write to output file."); + } + Ok(()) +} + //this part is taken from dev branch pub fn write_output( output: &PathBuf,