diff --git a/snapatac2-core/Cargo.toml b/snapatac2-core/Cargo.toml index c0202866e..1f4a27c03 100644 --- a/snapatac2-core/Cargo.toml +++ b/snapatac2-core/Cargo.toml @@ -15,7 +15,7 @@ anndata = { git = "https://github.com/kaizhang/anndata-rs.git", rev = "7279f0a8a anyhow = "1.0" bigtools = "0.2" bincode = "1.3" -bed-utils = { git = "https://github.com/kaizhang/bed-utils.git", rev = "19832eda2909dbe87289e09ba966af0797adb9f8" } +bed-utils = { git = "https://github.com/kaizhang/bed-utils.git", rev = "27ae1ee81db4ac84434014877b8d17aa22583df2" } extsort = "0.4" either = "1.8" flate2 = "1.0" diff --git a/snapatac2-core/src/export.rs b/snapatac2-core/src/export.rs index ce79c28c1..d4b19ada5 100644 --- a/snapatac2-core/src/export.rs +++ b/snapatac2-core/src/export.rs @@ -61,7 +61,7 @@ pub trait Exporter: SnapData { if let Some(barcodes_) = barcodes { let bc = barcodes_[start + i]; entry.extend(xs.into_iter().map(|mut x| { - x.name = Some(bc.to_string()); + x.barcode = Some(bc.to_string()); x })); } else { diff --git a/snapatac2-core/src/preprocessing/bam.rs b/snapatac2-core/src/preprocessing/bam.rs index 1508bd32d..e189be456 100644 --- a/snapatac2-core/src/preprocessing/bam.rs +++ b/snapatac2-core/src/preprocessing/bam.rs @@ -83,7 +83,7 @@ pub fn make_fragment_file, P2: AsRef>( let header: Header = fix_header(reader.read_header().unwrap()).parse().unwrap(); reader.read_reference_sequences().unwrap(); - let f = File::create(output_file.as_ref().clone()).expect("cannot create the output file"); + let f = File::create(output_file.as_ref()).expect("cannot create the output file"); let mut output: Box = if output_file.as_ref().extension().and_then(|x| x.to_str()) == Some("gz") { Box::new(GzEncoder::new(BufWriter::new(f), Compression::default())) diff --git a/snapatac2-core/src/preprocessing/count_data.rs b/snapatac2-core/src/preprocessing/count_data.rs index cea07fd1e..d5ab3d073 100644 --- a/snapatac2-core/src/preprocessing/count_data.rs +++ b/snapatac2-core/src/preprocessing/count_data.rs @@ -84,7 +84,7 @@ pub trait SnapData: AnnDataOp { /// Compute the fraction of reads in each region. fn frip(&self, regions: &Vec>) -> Result> { - let vec = qc::fraction_in_region(self.get_count_iter(2000)?.into_raw(), regions) + let vec = qc::fraction_of_reads_in_region(self.get_count_iter(2000)?.into_raw(), regions) .map(|x| x.0).flatten().flatten().collect::>(); Array2::from_shape_vec((self.n_obs(), regions.len()), vec).map_err(Into::into) } diff --git a/snapatac2-core/src/preprocessing/count_data/coverage.rs b/snapatac2-core/src/preprocessing/count_data/coverage.rs index c9492a21a..9c3708369 100644 --- a/snapatac2-core/src/preprocessing/count_data/coverage.rs +++ b/snapatac2-core/src/preprocessing/count_data/coverage.rs @@ -1,8 +1,8 @@ -use crate::preprocessing::count_data::genome::{FeatureCounter, GenomeBaseIndex, ChromSizes}; +use crate::preprocessing::{count_data::genome::{FeatureCounter, GenomeBaseIndex, ChromSizes}, Fragment}; use std::collections::HashMap; use anndata::data::{utils::to_csr_data, CsrNonCanonical}; -use bed_utils::bed::{BEDLike, BED, Strand, GenomicRange}; +use bed_utils::bed::{BEDLike, Strand, GenomicRange}; use nalgebra_sparse::{CsrMatrix, pattern::SparsityPattern}; use num::traits::{FromPrimitive, One, Zero, SaturatingAdd}; use rayon::iter::{IntoParallelIterator, ParallelIterator}; @@ -70,7 +70,7 @@ where self } - pub fn into_raw(self) -> impl ExactSizeIterator>>, usize, usize)> { + pub fn into_raw(self) -> impl ExactSizeIterator>, usize, usize)> { let index = self.index; self.coverage.map(move |(raw_mat, a, b)| { let beds = match raw_mat { @@ -85,25 +85,23 @@ where let size = values[j]; let (chrom, pos) = index.get_position(col_indices[j]); if size > 0 { - BED::new( - chrom, - pos, - pos + size as u64, - None, - None, - Some(Strand::Forward), - Default::default(), - ) + Fragment { + chrom: chrom.to_string(), + start: pos, + end: pos + size as u64, + barcode: None, + count: 1, + strand: Some(Strand::Forward), + } } else { - BED::new( - chrom, - pos + 1 - size.abs() as u64, - pos + 1, - None, - None, - Some(Strand::Reverse), - Default::default(), - ) + Fragment { + chrom: chrom.to_string(), + start: pos + 1 - size.abs() as u64, + end: pos + 1, + barcode: None, + count: 1, + strand: Some(Strand::Reverse), + } } }).collect() }).collect() @@ -118,12 +116,14 @@ where (row_start..row_end).map(|j| { let size = values[j]; let (chrom, start) = index.get_position(col_indices[j]); - BED::new( - chrom, + Fragment { + chrom: chrom.to_string(), start, - start + size as u64, - None, None, None, Default::default(), - ) + end: start + size as u64, + barcode: None, + count: 1, + strand: None, + } }).collect() }).collect() }, @@ -132,7 +132,7 @@ where }) } - pub fn into_raw_groups(self, key: F) -> impl ExactSizeIterator>>> + pub fn into_raw_groups(self, key: F) -> impl ExactSizeIterator>> where F: Fn(usize) -> K, K: Eq + PartialEq + std::hash::Hash, @@ -146,7 +146,6 @@ where .or_insert_with(Vec::new) .extend(xs.into_iter()); }); - ordered }) } diff --git a/snapatac2-core/src/preprocessing/count_data/import.rs b/snapatac2-core/src/preprocessing/count_data/import.rs index 9ab689f78..4355db076 100644 --- a/snapatac2-core/src/preprocessing/count_data/import.rs +++ b/snapatac2-core/src/preprocessing/count_data/import.rs @@ -62,7 +62,7 @@ where anndata.obsm().add_iter( obsm_key, fragments - .group_by(|x| x.barcode.clone()) + .group_by(|x| x.name().unwrap().to_string()) .into_iter() .progress_with(spinner) .filter(|(key, _)| white_list.map_or(true, |x| x.contains(key))) diff --git a/snapatac2-core/src/preprocessing/qc.rs b/snapatac2-core/src/preprocessing/qc.rs index 37535b25e..f6208877c 100644 --- a/snapatac2-core/src/preprocessing/qc.rs +++ b/snapatac2-core/src/preprocessing/qc.rs @@ -1,13 +1,11 @@ use std::{io::{Read, BufRead, BufReader}, ops::Div, collections::{HashMap, HashSet}}; use anndata::data::CsrNonCanonical; -use bed_utils::bed::{ - GenomicRange, BEDLike, tree::BedTree, - ParseError, Strand, BED, -}; +use bed_utils::bed::{GenomicRange, BEDLike, tree::BedTree, ParseError, Strand}; use anyhow::Result; use serde::{Serialize, Deserialize}; use extsort::sorter::Sortable; use bincode; +use smallvec::{SmallVec, smallvec}; pub type CellBarcode = String; @@ -18,7 +16,7 @@ pub struct Fragment { pub chrom: String, pub start: u64, pub end: u64, - pub barcode: CellBarcode, + pub barcode: Option, pub count: u32, pub strand: Option, } @@ -35,16 +33,16 @@ impl Sortable for Fragment { } impl Fragment { - pub fn to_insertions(&self) -> Vec { + pub fn to_reads(&self) -> SmallVec<[GenomicRange; 2]> { match self.strand { - None => vec![ + None => smallvec![ GenomicRange::new(self.chrom.clone(), self.start, self.start + 1), GenomicRange::new(self.chrom.clone(), self.end - 1, self.end), ], - Some(Strand::Forward) => vec![ + Some(Strand::Forward) => smallvec![ GenomicRange::new(self.chrom.clone(), self.start, self.start + 1) ], - Some(Strand::Reverse) => vec![ + Some(Strand::Reverse) => smallvec![ GenomicRange::new(self.chrom.clone(), self.end - 1, self.end) ], } @@ -67,9 +65,33 @@ impl BEDLike for Fragment { self.end = end; self } - fn name(&self) -> Option<&str> { None } - fn score(&self) -> Option { None } - fn strand(&self) -> Option { None } + fn name(&self) -> Option<&str> { + self.barcode.as_deref() + } + fn score(&self) -> Option { + Some(self.count.try_into().unwrap()) + } + fn strand(&self) -> Option { + self.strand + } +} + +impl core::fmt::Display for Fragment { + fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { + write!( + f, + "{}\t{}\t{}\t{}\t{}", + self.chrom(), + self.start(), + self.end(), + self.barcode.as_deref().unwrap_or("."), + self.count, + )?; + if let Some(strand) = self.strand() { + write!(f, "\t{}", strand)?; + } + Ok(()) + } } impl std::str::FromStr for Fragment { @@ -82,8 +104,13 @@ impl std::str::FromStr for Fragment { .and_then(|s| lexical::parse(s).map_err(ParseError::InvalidStartPosition))?; let end = fields.next().ok_or(ParseError::MissingEndPosition) .and_then(|s| lexical::parse(s).map_err(ParseError::InvalidEndPosition))?; - let barcode = fields.next().ok_or(ParseError::MissingName) - .map(|s| s.into())?; + let barcode = fields + .next() + .ok_or(ParseError::MissingName) + .map(|s| match s { + "." => None, + _ => Some(s.into()), + })?; let count = fields.next().map_or(Ok(1), |s| if s == "." { Ok(1) } else { @@ -245,7 +272,7 @@ where { let mut barcodes = HashMap::new(); fragments.for_each(|frag| { - let key = frag.barcode.clone(); + let key = frag.barcode.unwrap().clone(); *barcodes.entry(key).or_insert(0) += 1; }); barcodes @@ -253,7 +280,7 @@ where pub fn tss_enrichment(fragments: I, promoter: &BedTree) -> f64 where - I: Iterator>, + I: Iterator, { fn find_pos<'a>(promoter: &'a BedTree, ins: &'a GenomicRange) -> impl Iterator + 'a { promoter.find(ins).map(|(entry, data)| { @@ -316,25 +343,28 @@ pub fn fragment_size_distribution(data: I, max_size: usize) -> Vec size_dist } -/// Count the fraction of the records in the given regions. -pub fn fraction_in_region<'a, I, D>( +/// Count the fraction of the reads in the given regions. +pub fn fraction_of_reads_in_region<'a, I, D>( iter: I, regions: &'a Vec>, ) -> impl Iterator>, usize, usize)> + 'a where - I: Iterator>>, usize, usize)> + 'a, + I: Iterator>, usize, usize)> + 'a, { let k = regions.len(); - iter.map(move |(beds, start, end)| { - let frac = beds.into_iter().map(|xs| { - let sum = xs.len() as f64; + iter.map(move |(data, start, end)| { + let frac = data.into_iter().map(|fragments| { + let mut sum = 0.0; let mut counts = vec![0.0; k]; - xs.into_iter().for_each(|x| - regions.iter().enumerate().for_each(|(i, r)| { - if r.is_overlapped(&x) { - counts[i] += 1.0; - } - }) - ); + fragments + .into_iter().flat_map(|fragment| fragment.to_reads()) + .for_each(|read| { + sum += 1.0; + regions.iter().enumerate().for_each(|(i, r)| + if r.is_overlapped(&read) { + counts[i] += 1.0; + } + ) + }); counts.iter_mut().for_each(|x| *x /= sum); counts }).collect::>(); diff --git a/snapatac2-python/Cargo.toml b/snapatac2-python/Cargo.toml index 92979a5a3..e645581b7 100644 --- a/snapatac2-python/Cargo.toml +++ b/snapatac2-python/Cargo.toml @@ -18,7 +18,7 @@ anndata-hdf5 = { git = "https://github.com/kaizhang/anndata-rs.git", rev = "7279 pyanndata = { git = "https://github.com/kaizhang/anndata-rs.git", rev = "7279f0a8a2bf8a40aef5067af6dd2b4dca44a437" } extsort = "0.4" anyhow = "1.0" -bed-utils = { git = "https://github.com/kaizhang/bed-utils.git", rev = "19832eda2909dbe87289e09ba966af0797adb9f8" } +bed-utils = { git = "https://github.com/kaizhang/bed-utils.git", rev = "27ae1ee81db4ac84434014877b8d17aa22583df2" } flate2 = "1.0" itertools = "0.11" indicatif = "0.17" diff --git a/snapatac2-python/snapatac2/_utils.py b/snapatac2-python/snapatac2/_utils.py index 63ba89890..e06ff934c 100644 --- a/snapatac2-python/snapatac2/_utils.py +++ b/snapatac2-python/snapatac2/_utils.py @@ -1,11 +1,11 @@ import numpy as np -import anndata as ad import logging -from snapatac2._snapatac2 import AnnData, AnnDataSet, read +from anndata import AnnData +import snapatac2._snapatac2 as internal def is_anndata(data) -> bool: - return isinstance(data, ad.AnnData) or isinstance(data, AnnData) or isinstance(data, AnnDataSet) + return isinstance(data, AnnData) or isinstance(data, internal.AnnData) or isinstance(data, internal.AnnDataSet) def anndata_par(adatas, func, n_jobs=4): return anndata_ipar(list(enumerate(adatas)), lambda x: func(x[1]), n_jobs=n_jobs) @@ -15,7 +15,7 @@ def anndata_ipar(inputs, func, n_jobs=4): exist_in_memory_adata = False for _, adata in inputs: - if isinstance(adata, ad.AnnData): + if isinstance(adata, AnnData): exist_in_memory_adata = True break if exist_in_memory_adata: @@ -27,7 +27,7 @@ def anndata_ipar(inputs, func, n_jobs=4): from multiprocess import get_context def _func(x): - adata = read(x[1]) + adata = internal.read(x[1]) result = func((x[0], adata)) adata.close() return result diff --git a/snapatac2-python/snapatac2/datasets.py b/snapatac2-python/snapatac2/datasets.py index 6a0dd5dad..eaa84ed70 100644 --- a/snapatac2-python/snapatac2/datasets.py +++ b/snapatac2-python/snapatac2/datasets.py @@ -127,11 +127,11 @@ def pbmc5k(type: Literal["fragment, h5ad, annotated_h5ad"] = "fragment") -> Path path to the file. """ if type == "fragment": - return Path(datasets().fetch("atac_pbmc_5k.tsv.gz")) + return Path(datasets().fetch("atac_pbmc_5k.tsv.gz", progressbar=True)) elif type == "h5ad": - return Path(datasets().fetch("atac_pbmc_5k.h5ad")) + return Path(datasets().fetch("atac_pbmc_5k.h5ad", progressbar=True)) elif type == "annotated_h5ad": - return Path(datasets().fetch("atac_pbmc_5k_annotated.h5ad")) + return Path(datasets().fetch("atac_pbmc_5k_annotated.h5ad", progressbar=True)) else: raise NameError("type '{}' is not available.".format(type)) @@ -175,7 +175,7 @@ def colon() -> list[tuple[str, Path]]: list[tuple[str, Path]] A list of tuples, each tuple contains the sample name and the path to the fragment file. """ - files = datasets().fetch("colon_transverse.tar", processor = pooch.Untar()) + files = datasets().fetch("colon_transverse.tar", progressbar=True, processor = pooch.Untar()) return [(fl.split("/")[-1].split("_rep1_fragments")[0], Path(fl)) for fl in files] def cre_HEA() -> Path: diff --git a/snapatac2-python/snapatac2/genome.py b/snapatac2-python/snapatac2/genome.py index 17a4837bf..5b2a1e746 100644 --- a/snapatac2-python/snapatac2/genome.py +++ b/snapatac2-python/snapatac2/genome.py @@ -8,10 +8,13 @@ def __init__(self, chrom_sizes, annotation_filename, fasta = None) -> None: self._fasta_filename = fasta def fetch_annotations(self): - return datasets().fetch(self._annotation_filename) + return datasets().fetch(self._annotation_filename, progressbar=True) def fetch_fasta(self): - return datasets().fetch(self._fasta_filename, processor = Decompress(method = "gzip")) + return datasets().fetch( + self._fasta_filename, + processor=Decompress(method = "gzip"), + progressbar=True) GRCh37 = Genome( { diff --git a/snapatac2-python/snapatac2/metrics/__init__.py b/snapatac2-python/snapatac2/metrics/__init__.py index caa9b0230..4c31f91cc 100644 --- a/snapatac2-python/snapatac2/metrics/__init__.py +++ b/snapatac2-python/snapatac2/metrics/__init__.py @@ -4,12 +4,11 @@ import numpy as np import snapatac2 -from snapatac2._snapatac2 import AnnData, AnnDataSet import snapatac2._snapatac2 as internal from snapatac2.genome import Genome def tsse( - adata: AnnData | list[AnnData], + adata: internal.AnnData | list[internal.AnnData], gene_anno: Genome | Path, *, inplace: bool = True, @@ -39,13 +38,26 @@ def tsse( np.ndarray | list[np.ndarray] | None If `inplace = True`, directly adds the results to `adata.obs`. Otherwise return the results. + + Examples + -------- + >>> import snapatac2 as snap + >>> data = snap.pp.import_data(snap.datasets.pbmc500(downsample=True), chrom_sizes=snap.genome.hg38, sorted_by_barcode=False) + >>> snap.metrics.tsse(data, snap.genome.hg38) + >>> print(data.obs['tsse'].head()) + AAACTGCAGACTCGGA-1 32.129514 + AAAGATGCACCTATTT-1 22.052786 + AAAGATGCAGATACAA-1 27.109808 + AAAGGGCTCGCTCTAC-1 24.990329 + AAATGAGAGTCCCGCA-1 33.264463 + Name: tsse, dtype: float64 """ gene_anno = gene_anno.fetch_annotations() if isinstance(gene_anno, Genome) else gene_anno if isinstance(adata, list): return snapatac2._utils.anndata_par( adata, - lambda x: tsse(x, gene_anno, inplace), + lambda x: tsse(x, gene_anno, inplace=inplace), n_jobs=n_jobs, ) else: @@ -56,7 +68,7 @@ def tsse( return result def frip( - adata: AnnData | list[AnnData], + adata: internal.AnnData | list[internal.AnnData], regions: dict[str, Path | list[str]], *, inplace: bool = True, @@ -93,15 +105,14 @@ def frip( Examples -------- >>> import snapatac2 as snap - >>> data = snap.read(snap.datasets.pbmc5k(type='h5ad'), backed=None) - >>> snap.pp.add_frip(data, {"peaks_frac": snap.datasets.cre_HEA()}) + >>> data = snap.pp.import_data(snap.datasets.pbmc500(downsample=True), chrom_sizes=snap.genome.hg38, sorted_by_barcode=False) + >>> snap.metrics.frip(data, {"peaks_frac": snap.datasets.cre_HEA()}) >>> print(data.obs['peaks_frac'].head()) - index - AAACGAAAGACGTCAG-1 0.708841 - AAACGAAAGATTGACA-1 0.731711 - AAACGAAAGGGTCCCT-1 0.692434 - AAACGAACAATTGTGC-1 0.694849 - AAACGAACACTCGTGG-1 0.687787 + AAACTGCAGACTCGGA-1 0.715930 + AAAGATGCACCTATTT-1 0.697364 + AAAGATGCAGATACAA-1 0.713615 + AAAGGGCTCGCTCTAC-1 0.678428 + AAATGAGAGTCCCGCA-1 0.724910 Name: peaks_frac, dtype: float64 """ @@ -114,7 +125,7 @@ def frip( if isinstance(adata, list): return snapatac2._utils.anndata_par( adata, - lambda x: frip(x, regions, inplace), + lambda x: frip(x, regions, inplace=inplace), n_jobs=n_jobs, ) else: @@ -127,7 +138,7 @@ def frip( def frag_size_distr( - adata: AnnData | list[AnnData], + adata: internal.AnnData | list[internal.AnnData], *, max_recorded_size: int = 1000, add_key: str = "frag_size_distr", diff --git a/snapatac2-python/snapatac2/plotting/__init__.py b/snapatac2-python/snapatac2/plotting/__init__.py index 90645a255..d439ae7c0 100644 --- a/snapatac2-python/snapatac2/plotting/__init__.py +++ b/snapatac2-python/snapatac2/plotting/__init__.py @@ -54,7 +54,7 @@ def tsse( .. plotly:: >>> import snapatac2 as snap - >>> data = snap.read(str(snap.datasets.pbmc5k(type='gene'))) + >>> data = snap.read(snap.datasets.pbmc5k(type='h5ad')) >>> fig = snap.pl.tsse(data, show=False, out_file=None) >>> fig.show() """ @@ -190,17 +190,18 @@ def regions( import polars as pl import plotly.graph_objects as go + peaks = np.concatenate([[x for x in p] for p in peaks.values()]) count = aggregate_X(adata, groupby=groupby, normalize="RPKM") names = count.obs_names count = pl.DataFrame(count.X.T) count.columns = list(names) idx_map = {x: i for i, x in enumerate(adata.var_names)} - idx = [idx_map[x] for x in np.concatenate(list(peaks.values()))] + idx = [idx_map[x] for x in peaks] mat = np.log2(1 + count.to_numpy()[idx, :]) trace = go.Heatmap( x=count.columns, - y=np.concatenate(list(peaks.values()))[::-1], + y=peaks[::-1], z=mat, type='heatmap', colorscale='Viridis', diff --git a/snapatac2-python/snapatac2/preprocessing/_basic.py b/snapatac2-python/snapatac2/preprocessing/_basic.py index 20102787d..2c36f8d0a 100644 --- a/snapatac2-python/snapatac2/preprocessing/_basic.py +++ b/snapatac2-python/snapatac2/preprocessing/_basic.py @@ -3,11 +3,10 @@ from pathlib import Path import numpy as np -import anndata as ad +from anndata import AnnData import logging import snapatac2 -from snapatac2._snapatac2 import AnnData, AnnDataSet, PyFlagStat import snapatac2._snapatac2 as internal from snapatac2.genome import Genome @@ -27,7 +26,7 @@ def make_fragment_file( shift_right: int = -5, min_mapq: int | None = 30, chunk_size: int = 50000000, -) -> PyFlagStat: +) -> internal.PyFlagStat: """ Convert a BAM file to a fragment file. @@ -108,7 +107,7 @@ def import_data( tempdir: Path | None = None, backend: Literal['hdf5'] = 'hdf5', n_jobs: int = 8, -) -> AnnData: +) -> internal.AnnData: """Import data fragment files and compute basic QC metrics. A fragment is defined as the sequencing output corresponding to one location in the genome. @@ -191,12 +190,12 @@ def import_data( Examples -------- >>> import snapatac2 as snap - >>> data = snap.pp.import_data(snap.datasets.pbmc500(), genome=snap.genome.hg38, sorted_by_barcode=False) + >>> data = snap.pp.import_data(snap.datasets.pbmc500(downsample=True), chrom_sizes=snap.genome.hg38, sorted_by_barcode=False) >>> print(data) - AnnData object with n_obs × n_vars = 816 × 0 + AnnData object with n_obs × n_vars = 585 × 0 obs: 'n_fragment', 'frac_dup', 'frac_mito' uns: 'reference_sequences' - obsm: 'fragment' + obsm: 'fragment_paired' """ chrom_sizes = chrom_sizes.chrom_sizes if isinstance(chrom_sizes, Genome) else chrom_sizes if len(chrom_sizes) == 0: @@ -212,23 +211,23 @@ def import_data( if isinstance(fragment_file, list): n = len(fragment_file) if file is None: - adatas = [ad.AnnData() for _ in range(n)] + adatas = [AnnData() for _ in range(n)] else: if len(file) != n: raise ValueError("The length of 'file' must be the same as the length of 'fragment_file'") - adatas = [AnnData(filename=f, backend=backend) for f in file] + adatas = [internal.AnnData(filename=f, backend=backend) for f in file] snapatac2._utils.anndata_ipar( list(enumerate(adatas)), lambda x: internal.import_fragments( - x[1], fragment_file[x[0]], chrom_sizes, min_num_fragments, + x[1], fragment_file[x[0]], chrom_sizes, chrM, min_num_fragments, sorted_by_barcode, shift_left, shift_right, chunk_size, whitelist, tempdir, ), n_jobs=n_jobs, ) return adatas else: - adata = ad.AnnData() if file is None else AnnData(filename=file, backend=backend) + adata = AnnData() if file is None else internal.AnnData(filename=file, backend=backend) internal.import_fragments( adata, fragment_file, chrom_sizes, chrM, min_num_fragments, sorted_by_barcode, shift_left, shift_right, chunk_size, whitelist, tempdir, @@ -245,7 +244,7 @@ def import_contacts( chunk_size: int = 2000, tempdir: Path | None = None, backend: Literal['hdf5'] = 'hdf5', -) -> AnnData: +) -> internal.AnnData: """Import chromatin contacts. Parameters @@ -289,14 +288,14 @@ def import_contacts( if chrom_size is None: chrom_size = genome.chrom_sizes - adata = ad.AnnData() if file is None else AnnData(filename=file, backend=backend) + adata = AnnData() if file is None else internal.AnnData(filename=file, backend=backend) internal.import_contacts( adata, contact_file, chrom_size, sorted_by_barcode, chunk_size, tempdir ) return adata def add_tile_matrix( - adata: AnnData | list[AnnData], + adata: internal.AnnData | list[internal.AnnData], *, bin_size: int = 500, inplace: bool = True, @@ -305,7 +304,7 @@ def add_tile_matrix( file: Path | None = None, backend: Literal['hdf5'] = 'hdf5', n_jobs: int = 8, -) -> AnnData | None: +) -> internal.AnnData | None: """Generate cell by bin count matrix. This function is used to generate and add a cell by bin count matrix to the AnnData @@ -353,13 +352,13 @@ def add_tile_matrix( Examples -------- >>> import snapatac2 as snap - >>> data = snap.pp.import_data(snap.datasets.pbmc500(), genome=snap.genome.hg38, sorted_by_barcode=False) + >>> data = snap.pp.import_data(snap.datasets.pbmc500(downsample=True), chrom_sizes=snap.genome.hg38, sorted_by_barcode=False) >>> snap.pp.add_tile_matrix(data, bin_size=500) >>> print(data) - AnnData object with n_obs × n_vars = 816 × 6062095 - obs: 'tsse', 'n_fragment', 'frac_dup', 'frac_mito' + AnnData object with n_obs × n_vars = 585 × 6062095 + obs: 'n_fragment', 'frac_dup', 'frac_mito' uns: 'reference_sequences' - obsm: 'fragment' + obsm: 'fragment_paired' """ if isinstance(exclude_chroms, str): exclude_chroms = [exclude_chroms] @@ -376,16 +375,16 @@ def add_tile_matrix( else: if file is None: if adata.isbacked: - out = ad.AnnData(obs=adata.obs[:].to_pandas()) + out = AnnData(obs=adata.obs[:].to_pandas()) else: - out = ad.AnnData(obs=adata.obs[:]) + out = AnnData(obs=adata.obs[:]) else: - out = AnnData(filename=file, backend=backend, obs=adata.obs[:]) + out = internal.AnnData(filename=file, backend=backend, obs=adata.obs[:]) internal.mk_tile_matrix(adata, bin_size, chunk_size, exclude_chroms, out) return out def make_peak_matrix( - adata: AnnData | AnnDataSet, + adata: internal.AnnData | internal.AnnDataSet, *, use_rep: str | list[str] | None = None, inplace: bool = False, @@ -394,7 +393,7 @@ def make_peak_matrix( peak_file: Path | None = None, chunk_size: int = 500, use_x: bool = False, -) -> AnnData: +) -> internal.AnnData: """Generate cell by peak count matrix. This function will generate a cell by peak count matrix and store it in a @@ -443,11 +442,11 @@ def make_peak_matrix( Examples -------- >>> import snapatac2 as snap - >>> data = snap.pp.import_data(snap.datasets.pbmc500(), genome=snap.genome.hg38, sorted_by_barcode=False) + >>> data = snap.pp.import_data(snap.datasets.pbmc500(downsample=True), chrom_sizes=snap.genome.hg38, sorted_by_barcode=False) >>> peak_mat = snap.pp.make_peak_matrix(data, peak_file=snap.datasets.cre_HEA()) >>> print(peak_mat) - AnnData object with n_obs × n_vars = 816 × 1154611 - obs: 'tsse', 'n_fragment', 'frac_dup', 'frac_mito' + AnnData object with n_obs × n_vars = 585 × 1154611 + obs: 'n_fragment', 'frac_dup', 'frac_mito' """ import gzip @@ -476,16 +475,16 @@ def make_peak_matrix( else: if file is None: if adata.isbacked: - out = ad.AnnData(obs=adata.obs[:].to_pandas()) + out = AnnData(obs=adata.obs[:].to_pandas()) else: - out = ad.AnnData(obs=adata.obs[:]) + out = AnnData(obs=adata.obs[:]) else: - out = AnnData(filename=file, backend=backend, obs=adata.obs[:]) + out = internal.AnnData(filename=file, backend=backend, obs=adata.obs[:]) internal.mk_peak_matrix(adata, peaks, chunk_size, use_x, out) return out def make_gene_matrix( - adata: AnnData | AnnDataSet, + adata: internal.AnnData | internal.AnnDataSet, gene_anno: Genome | Path, *, inplace: bool = False, @@ -494,7 +493,7 @@ def make_gene_matrix( chunk_size: int = 500, use_x: bool = False, id_type: Literal['gene', 'transcript'] = "gene", -) -> AnnData: +) -> internal.AnnData: """Generate cell by gene activity matrix. Generate cell by gene activity matrix by counting the TN5 insertions in gene @@ -539,11 +538,11 @@ def make_gene_matrix( Examples -------- >>> import snapatac2 as snap - >>> data = snap.pp.import_data(snap.datasets.pbmc500(), genome=snap.genome.hg38, sorted_by_barcode=False) + >>> data = snap.pp.import_data(snap.datasets.pbmc500(downsample=True), chrom_sizes=snap.genome.hg38, sorted_by_barcode=False) >>> gene_mat = snap.pp.make_gene_matrix(data, gene_anno=snap.genome.hg38) >>> print(gene_mat) - AnnData object with n_obs × n_vars = 816 × 60606 - obs: 'tsse', 'n_fragment', 'frac_dup', 'frac_mito' + AnnData object with n_obs × n_vars = 585 × 60606 + obs: 'n_fragment', 'frac_dup', 'frac_mito' """ if isinstance(gene_anno, Genome): gene_anno = gene_anno.fetch_annotations() @@ -553,21 +552,22 @@ def make_gene_matrix( else: if file is None: if adata.isbacked: - out = ad.AnnData(obs=adata.obs[:].to_pandas()) + out = AnnData(obs=adata.obs[:].to_pandas()) else: - out = ad.AnnData(obs=adata.obs[:]) + out = AnnData(obs=adata.obs[:]) else: - out = AnnData(filename=file, backend=backend, obs=adata.obs[:]) + out = internal.AnnData(filename=file, backend=backend, obs=adata.obs[:]) internal.mk_gene_matrix(adata, gene_anno, chunk_size, use_x, id_type, out) return out def filter_cells( - data: AnnData, + data: internal.AnnData | list[internal.AnnData], min_counts: int | None = 1000, min_tsse: float | None = 5.0, max_counts: int | None = None, max_tsse: float | None = None, inplace: bool = True, + n_jobs: int = 8, ) -> np.ndarray | None: """ Filter cell outliers based on counts and numbers of genes expressed. @@ -580,6 +580,8 @@ def filter_cells( data The (annotated) data matrix of shape `n_obs` x `n_vars`. Rows correspond to cells and columns to regions. + `data` can also be a list of AnnData objects. + In this case, the function will be applied to each AnnData object in parallel. min_counts Minimum number of counts required for a cell to pass filtering. min_tsse @@ -590,6 +592,8 @@ def filter_cells( Maximum TSS enrichment score expressed required for a cell to pass filtering. inplace Perform computation inplace or return result. + n_jobs + Number of parallel jobs to use when `data` is a list. Returns ------- @@ -598,6 +602,17 @@ def filter_cells( a boolean index mask that does filtering, where `True` means that the cell is kept, `False` means the cell is removed. """ + if isinstance(data, list): + result = snapatac2._utils.anndata_par( + data, + lambda x: filter_cells(x, min_counts, min_tsse, max_counts, max_tsse, inplace=inplace), + n_jobs=n_jobs, + ) + if inplace: + return None + else: + return result + selected_cells = True if min_counts: selected_cells &= data.obs["n_fragment"] >= min_counts if max_counts: selected_cells &= data.obs["n_fragment"] <= max_counts @@ -631,7 +646,7 @@ def _find_most_accessible_features( def select_features( - adata: AnnData | AnnDataSet | list[AnnData], + adata: internal.AnnData | internal.AnnDataSet | list[internal.AnnData], n_features: int = 500000, filter_lower_quantile: float = 0.005, filter_upper_quantile: float = 0.005, diff --git a/snapatac2-python/snapatac2/preprocessing/_harmony.py b/snapatac2-python/snapatac2/preprocessing/_harmony.py index 2b3ff2aa1..cd0b14b9c 100644 --- a/snapatac2-python/snapatac2/preprocessing/_harmony.py +++ b/snapatac2-python/snapatac2/preprocessing/_harmony.py @@ -5,11 +5,11 @@ import numpy as np -from snapatac2._snapatac2 import AnnData, AnnDataSet +import snapatac2._snapatac2 as internal from snapatac2._utils import is_anndata def harmony( - adata: AnnData | AnnDataSet | np.ndarray, + adata: internal.AnnData | internal.AnnDataSet | np.ndarray, *, batch: str | list[str], use_rep: str = "X_spectral", diff --git a/snapatac2-python/snapatac2/preprocessing/_knn.py b/snapatac2-python/snapatac2/preprocessing/_knn.py index 1b43c849a..99f21d143 100644 --- a/snapatac2-python/snapatac2/preprocessing/_knn.py +++ b/snapatac2-python/snapatac2/preprocessing/_knn.py @@ -5,11 +5,10 @@ from scipy.sparse import csr_matrix from snapatac2._utils import is_anndata -from snapatac2 import _snapatac2 -from snapatac2._snapatac2 import AnnData, AnnDataSet +import snapatac2._snapatac2 as internal def knn( - adata: AnnData | AnnDataSet | np.ndarray, + adata: internal.AnnData | internal.AnnDataSet | np.ndarray, n_neighbors: int = 50, use_dims: int | list[int] | None = None, use_rep: str = 'X_spectral', @@ -67,7 +66,7 @@ def knn( n = data.shape[0] if method == 'hora': - adj = _snapatac2.approximate_nearest_neighbour_graph( + adj = internal.approximate_nearest_neighbour_graph( data.astype(np.float32), n_neighbors) elif method == 'pynndescent': import pynndescent @@ -78,7 +77,7 @@ def knn( indptr = np.arange(0, distances.size + 1, n_neighbors) adj = csr_matrix((distances, indices, indptr), shape=(n, n)) elif method == 'kdtree': - adj = _snapatac2.nearest_neighbour_graph(data, n_neighbors) + adj = internal.nearest_neighbour_graph(data, n_neighbors) else: raise ValueError("method must be one of 'hora', 'pynndescent', 'kdtree'") diff --git a/snapatac2-python/snapatac2/preprocessing/_mnn_correct.py b/snapatac2-python/snapatac2/preprocessing/_mnn_correct.py index 94d5780b4..6b5659561 100644 --- a/snapatac2-python/snapatac2/preprocessing/_mnn_correct.py +++ b/snapatac2-python/snapatac2/preprocessing/_mnn_correct.py @@ -4,11 +4,11 @@ import itertools from scipy.special import logsumexp -from snapatac2._snapatac2 import AnnData, AnnDataSet +import snapatac2._snapatac2 as internal from snapatac2._utils import is_anndata def mnc_correct( - adata: AnnData | AnnDataSet | np.adarray, + adata: internal.AnnData | internal.AnnDataSet | np.adarray, *, batch: str | list[str], n_neighbors: int = 5, diff --git a/snapatac2-python/snapatac2/preprocessing/_scanorama.py b/snapatac2-python/snapatac2/preprocessing/_scanorama.py index e335b1c2e..a8551b018 100644 --- a/snapatac2-python/snapatac2/preprocessing/_scanorama.py +++ b/snapatac2-python/snapatac2/preprocessing/_scanorama.py @@ -3,11 +3,11 @@ import numpy as np import itertools -from snapatac2._snapatac2 import AnnData, AnnDataSet +import snapatac2._snapatac2 as internal from snapatac2._utils import is_anndata def scanorama_integrate( - adata: AnnData | AnnDataSet | np.adarray, + adata: internal.AnnData | internal.AnnDataSet | np.adarray, *, batch: str | list[str], n_neighbors: int = 20, diff --git a/snapatac2-python/snapatac2/preprocessing/_scrublet.py b/snapatac2-python/snapatac2/preprocessing/_scrublet.py index 6729b45e1..e4225fbeb 100644 --- a/snapatac2-python/snapatac2/preprocessing/_scrublet.py +++ b/snapatac2-python/snapatac2/preprocessing/_scrublet.py @@ -5,14 +5,14 @@ import numpy as np import scipy.sparse as ss import logging -import anndata as ad +from anndata import AnnData from .._utils import chunks, anndata_par -from snapatac2._snapatac2 import AnnData, approximate_nearest_neighbour_graph, nearest_neighbour_graph +import snapatac2._snapatac2 as internal from snapatac2.tools._embedding import spectral def scrublet( - adata: AnnData | list[AnnData], + adata: internal.AnnData | list[internal.AnnData], features: str | np.ndarray | None = "selected", n_comps: int = 15, sim_doublet_ratio: float = 2.0, @@ -124,7 +124,7 @@ def scrublet( return probs, doublet_scores_obs def filter_doublets( - adata: AnnData | list[AnnData], + adata: internal.AnnData | list[internal.AnnData], probability_threshold: float | None = 0.5, score_threshold: float | None = None, inplace: bool = True, @@ -255,7 +255,7 @@ def scrub_doublets_core( del count_matrix_sim gc.collect() _, evecs = spectral( - ad.AnnData(X=merged_matrix, dtype=merged_matrix.dtype), + AnnData(X=merged_matrix, dtype=merged_matrix.dtype), features=None, n_comps=n_comps, inplace=False, @@ -357,10 +357,10 @@ def calculate_doublet_scores( # Find k_adj nearest neighbors if use_approx_neighbors: - knn = approximate_nearest_neighbour_graph( + knn = internal.approximate_nearest_neighbour_graph( manifold.astype(np.float32), k_adj) else: - knn = nearest_neighbour_graph(manifold, k_adj) + knn = internal.nearest_neighbour_graph(manifold, k_adj) indices = knn.indices indptr = knn.indptr neighbors = np.vstack( diff --git a/snapatac2-python/snapatac2/tools/_call_peaks.py b/snapatac2-python/snapatac2/tools/_call_peaks.py index bd92a8622..02c5fbe3c 100644 --- a/snapatac2-python/snapatac2/tools/_call_peaks.py +++ b/snapatac2-python/snapatac2/tools/_call_peaks.py @@ -84,7 +84,7 @@ def macs3( """ from MACS3.Signal.PeakDetect import PeakDetect from math import log - from multiprocess import Pool + from multiprocess import get_context from tqdm import tqdm import tempfile @@ -150,7 +150,7 @@ def _call_peaks(tags): return _snapatac2.find_reproducible_peaks(merged, others, blacklist) logging.info("Calling peaks...") - with Pool(n_jobs) as p: + with get_context("spawn").Pool(n_jobs) as p: peaks = list(tqdm(p.imap(_call_peaks, list(fragments.values())), total=len(fragments))) peaks = {k: v for k, v in zip(fragments.keys(), peaks)} diff --git a/snapatac2-python/snapatac2/tools/_clustering.py b/snapatac2-python/snapatac2/tools/_clustering.py index 9bda632a5..fc00f26d0 100644 --- a/snapatac2-python/snapatac2/tools/_clustering.py +++ b/snapatac2-python/snapatac2/tools/_clustering.py @@ -4,12 +4,11 @@ import scipy.sparse as ss import numpy as np -from snapatac2._snapatac2 import AnnData, AnnDataSet -import snapatac2._snapatac2 as _snapatac2 +import snapatac2._snapatac2 as internal from snapatac2._utils import get_igraph_from_adjacency, is_anndata def leiden( - adata: AnnData | AnnDataSet | ss.spmatrix, + adata: internal.AnnData | internal.AnnDataSet | ss.spmatrix, resolution: float = 1, objective_function: Literal['CPM', 'modularity', 'RBConfiguration'] = 'modularity', min_cluster_size: int = 5, @@ -137,7 +136,7 @@ def leiden( return groups def kmeans( - adata: AnnData | AnnDataSet | np.ndarray, + adata: internal.AnnData | internal.AnnDataSet | np.ndarray, n_clusters: int, n_iterations: int = -1, random_state: int = 0, @@ -182,7 +181,7 @@ def kmeans( data = adata.obsm[use_rep] else: data = adata - groups = _snapatac2.kmeans(n_clusters, data) + groups = internal.kmeans(n_clusters, data) groups = np.array(groups, dtype=np.compat.unicode) if inplace: adata.obs[key_added] = polars.Series( @@ -201,7 +200,7 @@ def kmeans( return groups def hdbscan( - adata: AnnData, + adata: internal.AnnData, min_cluster_size: int = 5, min_samples: int | None = None, cluster_selection_epsilon: float = 0.0, @@ -270,7 +269,7 @@ def hdbscan( ) def dbscan( - adata: AnnData, + adata: internal.AnnData, eps: float = 0.5, min_samples: int = 5, leaf_size: int = 30, diff --git a/snapatac2-python/snapatac2/tools/_embedding.py b/snapatac2-python/snapatac2/tools/_embedding.py index 0b43c4591..083e499f8 100644 --- a/snapatac2-python/snapatac2/tools/_embedding.py +++ b/snapatac2-python/snapatac2/tools/_embedding.py @@ -8,13 +8,12 @@ import math from snapatac2._utils import get_igraph_from_adjacency, is_anndata -from snapatac2._snapatac2 import (AnnData, AnnDataSet, jm_regress, jaccard_similarity, - cosine_similarity, spectral_embedding, multi_spectral_embedding, spectral_embedding_nystrom) +import snapatac2._snapatac2 as internal __all__ = ['umap2', 'umap', 'spectral', 'multi_spectral'] def umap2( - adata: AnnData | AnnDataSet | np.ndarray, + adata: internal.AnnData | internal.AnnDataSet | np.ndarray, n_comps: int = 2, key_added: str = 'umap', random_state: int = 0, @@ -62,7 +61,7 @@ def umap2( return umap def umap( - adata: AnnData | AnnDataSet | np.ndarray, + adata: internal.AnnData | internal.AnnDataSet | np.ndarray, n_comps: int = 2, use_dims: int | list[int] | None = None, use_rep: str = "X_spectral", @@ -119,7 +118,7 @@ def idf(data, features=None): return np.log(n / (1 + count)) def spectral( - adata: AnnData | AnnDataSet, + adata: internal.AnnData | internal.AnnDataSet, n_comps: int = 30, features: str | np.ndarray | None = "selected", random_state: int = 0, @@ -217,7 +216,7 @@ def spectral( if sample_size >= n_sample: if distance_metric == "cosine": - evals, evecs = spectral_embedding(adata, features, n_comps, random_state, feature_weights) + evals, evecs = internal.spectral_embedding(adata, features, n_comps, random_state, feature_weights) else: if feature_weights is None: feature_weights = idf(adata, features) @@ -232,7 +231,7 @@ def spectral( weighted_by_degree = False else: weighted_by_degree = True - v, u = spectral_embedding_nystrom(adata, features, n_comps, sample_size, weighted_by_degree, chunk_size) + v, u = internal.spectral_embedding_nystrom(adata, features, n_comps, sample_size, weighted_by_degree, chunk_size) evals, evecs = orthogonalize(v, u) else: if feature_weights is None: @@ -276,9 +275,9 @@ def __init__( self.out_dim = out_dim self.distance = distance if (self.distance == "jaccard"): - self.compute_similarity = lambda x, y=None: jaccard_similarity(x, y, feature_weights) + self.compute_similarity = lambda x, y=None: internal.jaccard_similarity(x, y, feature_weights) elif (self.distance == "cosine"): - self.compute_similarity = lambda x, y=None: cosine_similarity(x, y, feature_weights) + self.compute_similarity = lambda x, y=None: internal.cosine_similarity(x, y, feature_weights) elif (self.distance == "rbf"): from sklearn.metrics.pairwise import rbf_kernel self.compute_similarity = lambda x, y=None: rbf_kernel(x, y) @@ -385,7 +384,7 @@ def orthogonalize(evals, evecs): class JaccardNormalizer: def __init__(self, jm, c): - (slope, intersect) = jm_regress(jm, c) + (slope, intersect) = internal.jm_regress(jm, c) self.slope = slope self.intersect = intersect self.outlier = None @@ -452,7 +451,7 @@ def f(v): return sp.sparse.linalg.eigsh(A, k=k) def multi_spectral( - adatas: list[AnnData] | list[AnnDataSet], + adatas: list[internal.AnnData] | list[internal.AnnDataSet], n_comps: int = 30, features: str | list[str] | list[np.ndarray] | None = "selected", weights: list[float] | None = None, @@ -501,7 +500,7 @@ def multi_spectral( if weights is None: weights = [1.0 for _ in adatas] - evals, evecs = multi_spectral_embedding(adatas, features, weights, n_comps, random_state) + evals, evecs = internal.multi_spectral_embedding(adatas, features, weights, n_comps, random_state) if weighted_by_sd: idx = [i for i in range(evals.shape[0]) if evals[i] > 0] diff --git a/snapatac2-python/snapatac2/tools/_misc.py b/snapatac2-python/snapatac2/tools/_misc.py index 9bc61e0b0..31e7c19cd 100644 --- a/snapatac2-python/snapatac2/tools/_misc.py +++ b/snapatac2-python/snapatac2/tools/_misc.py @@ -6,7 +6,7 @@ import numpy as np import functools -from snapatac2._snapatac2 import AnnData, AnnDataSet +import snapatac2._snapatac2 as internal from snapatac2._utils import is_anndata from snapatac2.tools import leiden from snapatac2.preprocessing import knn @@ -14,11 +14,11 @@ __all__ = ['aggregate_X', 'aggregate_cells'] def aggregate_X( - adata: AnnData | AnnDataSet, + adata: internal.AnnData | internal.AnnDataSet, groupby: str | list[str] | None = None, normalize: Literal["RPM", "RPKM"] | None = None, file: Path | None = None, -) -> np.ndarray | AnnData: +) -> np.ndarray | internal.AnnData: """ Aggregate values in adata.X in a row-wise fashion. @@ -43,9 +43,8 @@ def aggregate_X( If `grouby` is `None`, return a 1d array. Otherwise, return an AnnData object. """ - import polars as pl from natsort import natsorted - import anndata as ad + from anndata import AnnData def norm(x): if normalize is None: @@ -79,15 +78,15 @@ def norm(x): keys, values = zip(*result.items()) if file is None: - out_adata = ad.AnnData(X=np.array(values)) + out_adata = AnnData(X=np.array(values)) else: - out_adata = AnnData(filename=file, X=np.array(values)) + out_adata = internal.AnnData(filename=file, X=np.array(values)) out_adata.obs_names = list(keys) out_adata.var_names = adata.var_names return out_adata def aggregate_cells( - adata: AnnData | AnnDataSet | np.ndarray, + adata: internal.AnnData | internal.AnnDataSet | np.ndarray, use_rep: str = 'X_spectral', target_num_cells: int | None = None, min_cluster_size: int = 50, @@ -174,7 +173,7 @@ def clustering(data): return membership def marker_enrichment( - gene_matrix: AnnData, + gene_matrix: internal.AnnData, groupby: str | list[str], markers: dict[str, list[str]], min_num_markers: int = 1, diff --git a/snapatac2-python/src/call_peaks.rs b/snapatac2-python/src/call_peaks.rs index bccc7663e..46ec2d324 100644 --- a/snapatac2-python/src/call_peaks.rs +++ b/snapatac2-python/src/call_peaks.rs @@ -1,17 +1,15 @@ use crate::utils::{open_file, AnnDataLike}; use anyhow::Context; -use bed_utils::bed::BED; use bed_utils::bed::{NarrowPeak, Strand}; use flate2::read::MultiGzDecoder; use indicatif::{ProgressIterator, ProgressStyle}; use itertools::Itertools; use polars::prelude::TakeRandom; -use snapatac2_core::preprocessing::SnapData; -use snapatac2_core::utils::clip_peak; -use snapatac2_core::utils::merge_peaks; +use snapatac2_core::{ + preprocessing::{Fragment, SnapData}, + utils::{clip_peak, merge_peaks}, +}; -use rayon::iter::{ParallelBridge, ParallelIterator}; -use std::sync::{Arc, Mutex}; use anndata::Backend; use anndata_hdf5::H5; use anyhow::{ensure, Result}; @@ -23,10 +21,12 @@ use polars::{ }; use pyanndata::data::PyDataFrame; use pyo3::prelude::*; +use rayon::iter::{ParallelBridge, ParallelIterator}; use std::collections::HashSet; use std::fs::File; use std::io::BufWriter; use std::io::Write; +use std::sync::{Arc, Mutex}; use std::{collections::HashMap, ops::Deref, path::PathBuf}; #[pyfunction] @@ -266,7 +266,7 @@ pub fn create_fwtrack_obj<'py>( File::open(&fl).with_context(|| format!("cannot open file: {}", fl.display()))?, ); bed_utils::bed::io::Reader::new(reader, None) - .into_records::>() + .into_records::() .try_for_each(|x| { let x = x?; let chr = x.chrom().as_bytes(); @@ -388,8 +388,7 @@ fn _export_tags>( if let Some((_, fl)) = files.get(&i) { let mut fl = fl.lock().unwrap(); beds.into_iter().try_for_each(|bed| { - if bed.strand().is_some() - || max_frag_size.map_or(true, |s| s >= bed.len()) + if bed.strand().is_some() || max_frag_size.map_or(true, |s| s >= bed.len()) { writeln!(fl, "{}", bed)?; }