-
Notifications
You must be signed in to change notification settings - Fork 3
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
rework basic_qc based on generic constraint model
- Loading branch information
Showing
1 changed file
with
198 additions
and
155 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,173 +1,216 @@ | ||
from anndata import AnnData | ||
import scanpy as sc | ||
import numpy as np | ||
import pandas as pd | ||
from anndata import AnnData | ||
import warnings | ||
|
||
def filter_on_counts_genes(adata: AnnData, | ||
min_counts: int = 2000, | ||
max_counts: int = 100000, | ||
min_genes: int = 1000, | ||
max_genes: int = 13000, | ||
inplace: bool = False | ||
) -> AnnData | None: | ||
def _check_adata(adata, column): | ||
if not isinstance(adata, AnnData): | ||
warnings.warn('adata does not appear to be an anndata object', UserWarning) | ||
if 'qc_constraints' not in adata.uns.keys(): | ||
adata.uns['qc_constraints'] = {} | ||
if column not in adata.uns['qc_constraints'].keys(): | ||
adata.uns['qc_constraints'][column] = {} | ||
|
||
def add_range_constraint(adata, column, gt=None, lt=None, subset=None, subset_values=None): | ||
""" | ||
Filter samples based on number of counts (UMIs) and genes detected. | ||
Add a range constraint for a specific column in an AnnData object. This function allows filtering | ||
cells where values in a column fall within the specified range [gt, lt]. Open intervals are supported | ||
by leaving 'gt' or 'lt' as None. Subsetting based on an additional column will apply the condition to | ||
the subset and retain the rest of the cells. | ||
Args: | ||
adata: Anndata object. | ||
min_counts: Minimum counts detected. | ||
max_counts: Maximum counts detected. | ||
min_genes: Minimum genes detected. | ||
max_genes: Maximum genes detected. | ||
inplace: Update the adata object in place or return unmodified object with keeper_cells flagged. | ||
Parameters: | ||
----------- | ||
adata : AnnData | ||
The AnnData object where constraints will be added under adata.uns['qc_constraints']. | ||
column : str | ||
The column in 'adata.obs' for which the range constraint will be applied. | ||
gt : float or None, optional | ||
The lower bound of the range (inclusive). If None, the lower bound is open. | ||
lt : float or None, optional | ||
The upper bound of the range (inclusive). If None, the upper bound is open. | ||
subset : str or None, optional | ||
The column name in 'adata.obs' to use for subsetting the data. | ||
subset_values : list or None, optional | ||
A list of values to subset on, applied to 'subset' column. | ||
""" | ||
constraint = { | ||
'gt': gt, | ||
'lt': lt, | ||
'subset': {}, | ||
'exclude': [], | ||
'groupby': None | ||
} | ||
|
||
Returns: | ||
Returns either AnnData | None | ||
if subset and subset_values: | ||
constraint['subset'][subset] = subset_values | ||
|
||
_check_adata(adata, column) | ||
|
||
# Assign a unique key for each constraint | ||
constraint_key = f'constraint_{len(adata.uns["qc_constraints"][column]) + 1}' | ||
adata.uns['qc_constraints'][column][constraint_key] = constraint | ||
|
||
def add_exclude_constraint(adata, column, exclude_values, subset=None, subset_values=None): | ||
""" | ||
if isinstance(adata, AnnData): | ||
## Filter cells based on counts and genes detected | ||
if 'keeper_cells' not in adata.obs.columns: | ||
adata.obs["keeper_cells"] = [True] * adata.shape[0] | ||
|
||
## Filter cells based on counts detected | ||
adata.obs["keeper_cells"] &= sc.pp.filter_cells(adata, min_counts = min_counts, inplace=False)[0] | ||
adata.obs["keeper_cells"] &= sc.pp.filter_cells(adata, max_counts = max_counts, inplace=False)[0] | ||
|
||
## Filter cells based on genes detected | ||
adata.obs["keeper_cells"] &= sc.pp.filter_cells(adata, min_genes = min_genes, inplace=False)[0] | ||
adata.obs["keeper_cells"] &= sc.pp.filter_cells(adata, max_genes = max_genes, inplace=False)[0] | ||
|
||
## | ||
if inplace: | ||
adata._inplace_subset_obs(adata.obs["keeper_cells"]) | ||
return None | ||
else: | ||
return adata | ||
|
||
def filter_on_precomputed_metrics(adata: AnnData, | ||
doublet_score: float = 0.3, | ||
pct_counts_mt: float = 3.0, | ||
min_GEX_Reads_mapped_confidently_to_genome: float = 0.0, | ||
min_GEX_Reads_mapped_to_genome: float = 0.0, | ||
max_GEX_Reads_with_TSO: float = 1.0, | ||
inplace: bool = False, | ||
) -> AnnData | None: | ||
Add an exclude constraint for a specific column in an AnnData object. This function filters | ||
out cells where the column value is in the 'exclude_values' list. Subsetting based on | ||
additional conditions is also supported. | ||
Parameters: | ||
----------- | ||
adata : AnnData | ||
The AnnData object where constraints will be added. | ||
column : str | ||
The column in 'adata.obs' for which the exclude constraint will be applied. | ||
exclude_values : list | ||
A list of values that should be excluded from 'column'. | ||
subset : str or None, optional | ||
The column name in 'adata.obs' to use for subsetting the data. | ||
subset_values : list or None, optional | ||
A list of values to subset on, applied to 'subset' column. | ||
""" | ||
Filter samples based on precomputed quality control metrics. | ||
|
||
constraint = { | ||
'gt': None, | ||
'lt': None, | ||
'subset': {}, | ||
'exclude': exclude_values, | ||
'groupby': None | ||
} | ||
|
||
Args: | ||
adata: Anndata object. | ||
doublet_score: Maximum doublet score. | ||
pct_counts_mt: Maximum percentage of counts in mitochondrial genes. | ||
min_GEX_Reads_mapped_confidently_to_genome: Minimum percentage of confidently mapped reads. There is no pre-defined good practice for threshold, user must specify. | ||
min_GEX_Reads_mapped_to_genome: Minimum percentage of reads mapped to genome. There is no pre-defined good practice for threshold, user must specify. | ||
max_GEX_Reads_with_TSO: Maximum percentage of reads with TSO, per cell. There is no pre-defined good practice for threshold, user must specify. | ||
inplace: Update the adata object in place or return unmodified object with keeper_cells flagged. | ||
if subset and subset_values: | ||
constraint['subset'][subset] = subset_values | ||
|
||
Returns: | ||
Returns either AnnData | None | ||
_check_adata(adata, column) | ||
|
||
# Assign a unique key for each constraint | ||
constraint_key = f'constraint_{len(adata.uns["qc_constraints"][column]) + 1}' | ||
adata.uns['qc_constraints'][column][constraint_key] = constraint | ||
|
||
def add_group_level_constraint(adata, column, groupby, gt=None, lt=None, agg_func='mean'): | ||
""" | ||
if isinstance(adata, AnnData): | ||
## Filter cells based on counts and genes detected | ||
if 'keeper_cells' not in adata.obs.columns: | ||
adata.obs["keeper_cells"] = [True] * adata.shape[0] | ||
|
||
## Gather function parameters | ||
filter_params = locals(); del filter_params['adata']; del filter_params['inplace'] | ||
|
||
## Check that columns are present in the adata object and skip if not present. | ||
for qc_metric in list(filter_params.keys()): | ||
if qc_metric not in adata.obs.columns: | ||
warnings.warn(f"A {qc_metric} column is missing in the adata object. Skipping this metric.", UserWarning) | ||
del filter_params[qc_metric] | ||
|
||
## Filter cells based on qc metrics | ||
if "doublet_score" in filter_params.keys(): | ||
adata.obs["keeper_cells"] &= adata.obs["doublet_score"] < doublet_score | ||
if "pct_counts_mt" in filter_params.keys(): | ||
adata.obs["keeper_cells"] &= adata.obs["pct_counts_mt"] < pct_counts_mt | ||
if "GEX_Reads_mapped_confidently_to_genome" in filter_params.keys(): | ||
adata.obs["keeper_cells"] &= adata.obs["GEX_Reads_mapped_confidently_to_genome"] > GEX_Reads_mapped_confidently_to_genome | ||
if "GEX_Reads_mapped_to_genome" in filter_params.keys(): | ||
adata.obs["keeper_cells"] &= adata.obs["GEX_Reads_mapped_to_genome"] > GEX_Reads_mapped_to_genome | ||
if "GEX_Reads_with_TSO" in filter_params.keys(): | ||
adata.obs["keeper_cells"] &= adata.obs["GEX_Reads_with_TSO"] < GEX_Reads_with_TSO | ||
|
||
## | ||
if inplace: | ||
adata._inplace_subset_obs(adata.obs["keeper_cells"]) | ||
return None | ||
else: | ||
return adata | ||
|
||
def filter_utilizing_coarse_labels(adata: AnnData, | ||
coarse_label_column: str, | ||
coarse_label_map: dict = {"Neurons": ["Excitatory", "Inhibitory"], | ||
"Non-Neurons": ["Astrocytes", "Oligodendrocytes", "Microglia", "Endothelial", "Pericytes"]}, | ||
coarse_label_gene_threshold: dict = {"Neurons": 2000, "Non-Neurons": 1000}, | ||
inplace: bool = False | ||
) -> AnnData | None: | ||
Add a group-level constraint for a specific column in an AnnData object. Cells are grouped | ||
by the 'groupby' column, and an aggregation function (e.g., 'mean') is applied to each group | ||
for the specified column. Cells in groups that meet the 'gt' and 'lt' conditions are kept. | ||
Parameters: | ||
----------- | ||
adata : AnnData | ||
The AnnData object where constraints will be added. | ||
column : str | ||
The column in 'adata.obs' for which the group-level constraint will be applied. | ||
groupby : str | ||
The column in 'adata.obs' used to group the cells (e.g., cluster ID). | ||
gt : float or None, optional | ||
The lower bound for the aggregated group value. If None, the lower bound is open. | ||
lt : float or None, optional | ||
The upper bound for the aggregated group value. If None, the upper bound is open. | ||
agg_func : str, optional | ||
The aggregation function to apply to the groups. Can be 'mean', 'sum', 'std', or 'median'. | ||
Default is 'mean'. | ||
""" | ||
Filter samples based on coarse label specific thresholds for genes detected. | ||
constraint = { | ||
'gt': gt, | ||
'lt': lt, | ||
'subset': {}, | ||
'exclude': [], | ||
'groupby': groupby, | ||
'agg_func': agg_func | ||
} | ||
|
||
Args: | ||
adata: Anndata object. | ||
coarse_label_column: Column name in adata.obs containing coarse labeling identifying neuron and non-neuronal cell types. | ||
coarse_label_map: Coarse cell type labels to define specific filtering on. | ||
coarse_label_gene_threshold: Minimum genes detected for each coarse label. | ||
inplace: Update the adata object in place or return unmodified object with keeper_cells flagged. | ||
_check_adata(adata, column) | ||
|
||
# Assign a unique key for each constraint | ||
constraint_key = f'constraint_{len(adata.uns["qc_constraints"][column]) + 1}' | ||
adata.uns['qc_constraints'][column][constraint_key] = constraint | ||
|
||
def apply_constraints(adata, inplace=False): | ||
""" | ||
Apply all quality control (QC) constraints stored in 'adata.uns["qc_constraints"]' to the 'adata.obs' | ||
DataFrame. This function handles both individual cell-level constraints (range, exclude) and | ||
group-level constraints (using a 'groupby' column). The resulting 'keeper_cells' column in 'adata.obs' | ||
will indicate which cells passed the constraints. | ||
Parameters: | ||
----------- | ||
adata : AnnData | ||
The AnnData object where constraints are applied. | ||
inplace : bool, optional | ||
If True, only the cells passing the constraints will be retained in 'adata'. | ||
If False (default), the 'keeper_cells' column will be added to 'adata.obs' to indicate | ||
which cells passed the filtering. | ||
Returns: | ||
Returns either AnnData | None | ||
-------- | ||
AnnData | ||
The filtered AnnData object with the 'keeper_cells' column added to 'adata.obs'. | ||
If 'inplace=True', the returned object will only contain the filtered cells. | ||
""" | ||
if isinstance(adata, AnnData): | ||
## Filter cells based on counts and genes detected | ||
if 'keeper_cells' not in adata.obs.columns: | ||
adata.obs["keeper_cells"] = [True] * adata.shape[0] | ||
## Filter specific quality thresholds for each coarse label | ||
for coarse_label, coarse_labels in coarse_label_map.items(): | ||
adata.obs["keeper_cells"] &= ~adata.obs[coarse_label_column].isin(coarse_labels) | sc.pp.filter_cells(adata, min_genes = coarse_label_threshold[coarse_label], inplace=False)[0] | ||
## | ||
keeper_cells = pd.Series([True] * adata.n_obs, index=adata.obs.index) | ||
agg_funcs = {'sum': np.sum, 'mean': np.mean, 'std': np.std, 'median': np.median} | ||
|
||
for col, constraints_dict in adata.uns['qc_constraints'].items(): | ||
if col not in adata.obs.columns: | ||
warnings.warn(f'Column {col} not in obs, skipping this metric', UserWarning) | ||
continue | ||
|
||
column_data = adata.obs[col] | ||
col_keeper = pd.Series([True] * adata.n_obs, index=adata.obs.index) | ||
|
||
# Iterate over the dictionary of constraints | ||
for constraint_key, constraints in constraints_dict.items(): | ||
gt = pd.Series([True] * adata.n_obs, index=adata.obs.index) | ||
lt = pd.Series([True] * adata.n_obs, index=adata.obs.index) | ||
exclude = pd.Series([True] * adata.n_obs, index=adata.obs.index) | ||
subset = pd.Series([True] * adata.n_obs, index=adata.obs.index) | ||
|
||
# Apply group-level constraint | ||
if 'groupby' in constraints and constraints['groupby'] is not None: | ||
groupby = constraints['groupby'] | ||
agg_func = agg_funcs[constraints['agg_func']] | ||
|
||
if groupby in adata.obs: | ||
grouped = adata.obs.groupby(groupby)[col].agg(agg_func) | ||
|
||
group_gt = pd.Series([True] * len(grouped), index=grouped.index) | ||
group_lt = pd.Series([True] * len(grouped), index=grouped.index) | ||
|
||
if constraints['gt'] is not None: | ||
group_gt = grouped >= constraints['gt'] | ||
|
||
if constraints['lt'] is not None: | ||
group_lt = grouped <= constraints['lt'] | ||
|
||
group_keeper = group_gt & group_lt | ||
groups_to_keep = group_keeper[group_keeper].index | ||
cur_sub = adata.obs[groupby].isin(groups_to_keep) | ||
col_keeper &= cur_sub | ||
else: | ||
if constraints['gt'] is not None: | ||
gt = column_data >= constraints['gt'] | ||
|
||
if constraints['lt'] is not None: | ||
lt = column_data <= constraints['lt'] | ||
|
||
if constraints['exclude']: | ||
exclude = ~column_data.isin(constraints['exclude']) | ||
|
||
if constraints['subset'].items(): | ||
for subset_col, subset_values in constraints['subset'].items(): | ||
subset_in = adata.obs[subset_col].isin(subset_values) if subset_col in adata.obs else pd.Series([True] * adata.n_obs, index=adata.obs.index) | ||
cur_sub = (gt & lt & exclude) | ||
cur_sub |= ~subset_in | ||
col_keeper &= cur_sub | ||
else: | ||
col_keeper &= (gt & lt & exclude) | ||
|
||
keeper_cells &= col_keeper | ||
|
||
adata.obs['keeper_cells'] = keeper_cells | ||
|
||
if inplace: | ||
adata._inplace_subset_obs(adata.obs["keeper_cells"]) | ||
return None | ||
else: | ||
return adata | ||
|
||
# def filter_on_cluster_metrics(adata: AnnData, | ||
# cluster_column: str, | ||
# annotation_columns: list, | ||
# annotation_thresholds: dict, ## HOW TO ENCODE DIRECTION, E,G. > 0.3 doublet score or < 0.9 TSO. | ||
# ) -> AnnData | None: | ||
# """ | ||
# Filter samples based on entropy of quality control metrics or metadata for each cluster. | ||
# | ||
# Args: | ||
# adata: type AnnData: Anndata object. | ||
# cluster_column: Column name in adata.obs containing cluster labels. | ||
# annotation_columns: Column name in adata.obs containing entropy values. | ||
# annotation_thresholds: Minimum entropy values for each annotation being considered. | ||
# | ||
# Returns: | ||
# Returns either AnnData | None | ||
# """ | ||
# if isinstance(adata, AnnData): | ||
# ## Filter cells based on counts and genes detected | ||
# if 'keeper_cells' not in adata.obs.columns: | ||
# adata.obs["keeper_cells"] = [True] * adata.shape[0] | ||
# ## Compute cluster medians for each annotation | ||
# for anno in annotation_columns: | ||
# try: | ||
# adata.obs[anno + "_cluster_median"] = adata.obs.groupby("cluster")[anno].median().round(4) | ||
# except: | ||
# warnings.warn(f"Unable to compute cluster median for {anno}. Skipping this metric.", UserWarning) | ||
# continue | ||
# ## Apply filtering based on user provided thresholds | ||
# for anno, threshold in annotation_thresholds.items(): | ||
# adata.obs["keeper_cells"] &= adata.obs[anno + "_entropy"] > threshold | ||
# ## | ||
# if inplace: | ||
# adata._inplace_subset_obs(adata.obs["keeper_cells"]) | ||
# return None | ||
# else: | ||
# return adata | ||
adata._inplace_subset_obs(adata.obs['keeper_cells']) | ||
|
||
return adata | ||
|