diff --git a/metakb/main.py b/metakb/main.py index e4ea5ec4..db5765c0 100644 --- a/metakb/main.py +++ b/metakb/main.py @@ -3,9 +3,7 @@ from fastapi.openapi.utils import get_openapi from metakb.query import QueryHandler from metakb.version import __version__ -from metakb.schemas.app import SearchService, SearchIDService, \ - SearchStatementsService -from typing import Optional +from typing import Dict, Optional app = FastAPI( docs_url='/api/v2', @@ -15,7 +13,7 @@ query = QueryHandler() -def custom_openapi(): +def custom_openapi() -> Dict: """Generate custom fields for OpenAPI response.""" if app.openapi_schema: return app.openapi_schema @@ -38,76 +36,44 @@ def custom_openapi(): app.openapi = custom_openapi -search_summary = ("Given variation, disease, therapy, and/or gene, " - "return associated statements and propositions.") -search_response_description = "A response to a validly-formed query." -search_description = ("Return statements and propositions associated" - " to the queried concepts.") -v_description = "Variation (subject) to search" +search_studies_summary = ( + "Get nested studies from queried concepts that match all conditions provided." +) +search_studies_descr = ( + "Return nested studies associated to the queried concepts. For example, if " + "`variation` and `therapy` are provided, will return all studies that have both " + "the provided `variation` and `therapy`." +) +v_description = ("Variation (subject) to search. Can be free text or VRS Variation ID.") d_description = "Disease (object qualifier) to search" t_description = "Therapy (object) to search" g_description = "Gene to search" -s_description = "Statement ID to search" -detail_description = "Display all descriptors, methods, and documents." +s_description = ("Study ID to search.") +search_study_response_descr = "A response to a validly-formed query." -@app.get('/api/v2/search', - summary=search_summary, - response_description=search_response_description, - response_model=SearchService, - description=search_description, - response_model_exclude_none=True) -async def search( +@app.get('/api/v2/search/studies', + summary=search_studies_summary, + response_description=search_study_response_descr, + description=search_studies_descr,) +async def get_studies( variation: Optional[str] = Query(None, description=v_description), disease: Optional[str] = Query(None, description=d_description), therapy: Optional[str] = Query(None, description=t_description), gene: Optional[str] = Query(None, description=g_description), - statement_id: Optional[str] = Query(None, description=s_description), - detail: Optional[bool] = Query(False, description=detail_description) -): - """Search endpoint""" - resp = await query.search(variation, disease, therapy, gene, statement_id, detail) - return resp - - -search_statements_summary = ( - "Given variation, disease, therapy, and/or gene, return associated " - "nested statements containing propositions and descriptors.") -search_statement_response_descr = "A response to a validly-formed query." -search_statements_descr = ( - "Return nested statements associated to the queried concepts.") - - -@app.get('/api/v2/search/statements', - summary=search_statements_summary, - response_description=search_statement_response_descr, - response_model=SearchStatementsService, - description=search_statements_descr, - response_model_exclude_none=True) -async def get_statements( - variation: Optional[str] = Query(None, description=v_description), - disease: Optional[str] = Query(None, description=d_description), - therapy: Optional[str] = Query(None, description=t_description), - gene: Optional[str] = Query(None, description=g_description), - statement_id: Optional[str] = Query(None, description=s_description)): - """Return nested statements for queried concepts""" - resp = await query.search_statements(variation, disease, therapy, gene, - statement_id) - return resp - - -id_query_desc = ("Given Meta-KB statement_id, proposition_id, descriptor_id," - " document_id, or method_id return the node content.") -id_search_description = ("Return node of the queried node id.") -id_description = "Node ID to search" - - -@app.get('/api/v2/search/{id}', - summary=id_query_desc, - response_description=search_response_description, - response_model=SearchIDService, - description=id_search_description, - response_model_exclude_none=True) -async def search_by_id(id: str = Query(None, description=id_description)): - """Search by ID endpoint""" - return query.search_by_id(id) + study_id: Optional[str] = Query(None, description=s_description) +) -> dict: + """Get nested studies from queried concepts that match all conditions provided. + For example, if `variation` and `therapy` are provided, will return all studies + that have both the provided `variation` and `therapy`. + + :param variation: Variation query (Free text or VRS Variation ID) + :param disease: Disease query + :param therapy: Therapy query + :param gene: Gene query + :param study_id: Study ID query. + :return: SearchStudiesService response containing nested studies and service + metadata + """ + resp = await query.search_studies(variation, disease, therapy, gene, study_id) + return resp.model_dump(exclude_none=True) diff --git a/metakb/query.py b/metakb/query.py index fd91c313..945803ab 100644 --- a/metakb/query.py +++ b/metakb/query.py @@ -1,26 +1,60 @@ """Module for queries.""" -from typing import Dict, List, Optional, Tuple -import logging +from copy import copy +from enum import Enum import json -from json.decoder import JSONDecodeError -from urllib.parse import quote +import logging +from typing import Dict, List, Optional, Tuple -from ga4gh.vrsatile.pydantic.vrsatile_models import Extension, Expression +from ga4gh.core import core_models +from ga4gh.vrs import models +from metakb.schemas.app import SourceName +from neo4j import Transaction from neo4j.graph import Node -from neo4j.data import Record -from neo4j import Transaction, Session +from pydantic import ValidationError from metakb.database import Graph from metakb.normalizers import ViccNormalizers -from metakb.schemas.app import SearchService, StatementResponse, \ - TherapeuticResponseProposition, VariationDescriptor, \ - ValueObjectDescriptor, GeneDescriptor, Method, \ - Document, SearchIDService, DiagnosticProposition, PrognosticProposition, \ - SearchStatementsService, NestedStatementResponse, PropositionType, \ - Proposition, ServiceMeta, Predicate +from metakb.schemas.annotation import Document, Method +from metakb.schemas.api import SearchStudiesService, ServiceMeta +from metakb.schemas.categorical_variation import CategoricalVariation +from metakb.schemas.variation_statement import ( + VariantTherapeuticResponseStudy, + _VariantOncogenicityStudyQualifier, +) + +logger = logging.getLogger(__name__) + + +class VariationRelation(str, Enum): + """Create enum for relation between variation and categorical variation""" + + HAS_MEMBERS = "HAS_MEMBERS" + HAS_DEFINING_CONTEXT = "HAS_DEFINING_CONTEXT" + + +class TherapeuticRelation(str, Enum): + """Create enum for therapeutic relation""" -logger = logging.getLogger("metakb.query") -logger.setLevel(logging.DEBUG) + HAS_COMPONENTS = "HAS_COMPONENTS" + HAS_SUBSTITUTES = "HAS_SUBSTITUTES" + + +class TherapeuticProcedureType(str, Enum): + """Create enum for therapeutic procedures""" + + COMBINATION = "CombinationTherapy" + SUBSTITUTES = "TherapeuticSubstituteGroup" + + +def _update_mappings(params: Dict) -> None: + """Update ``params.mappings`` if it exists + The mappings field will be a string and will be updated to the dict representation + + :param params: Parameters. Will be mutated if mappings field exists + """ + mappings = params.get("mappings") + if mappings: + params["mappings"] = json.loads(mappings) class QueryHandler: @@ -38,1246 +72,631 @@ def __init__(self, uri: str = "", self.driver = Graph(uri, creds).driver self.vicc_normalizers = normalizers - def get_normalized_therapy(self, therapy: str, - warnings: List[str]) -> Optional[str]: - """Get normalized therapy concept. - - :param str therapy: Therapy query - :param List[str] warnings: A list of warnings for the search query - :return: A normalized therapy concept if it exists + async def search_studies( + self, variation: Optional[str] = None, disease: Optional[str] = None, + therapy: Optional[str] = None, gene: Optional[str] = None, + study_id: Optional[str] = None + ) -> SearchStudiesService: + """Get nested studies from queried concepts that match all conditions provided. + For example, if `variation` and `therapy` are provided, will return all studies + that have both the provided `variation` and `therapy`. + + :param variation: Variation query (Free text or VRS Variation ID) + :param disease: Disease query + :param therapy: Therapy query + :param gene: Gene query + :param study_id: Study ID query. + :return: SearchStudiesService response containing nested studies and service + metadata """ - _, normalized_therapy_id = \ - self.vicc_normalizers.normalize_therapy([therapy]) - - if not normalized_therapy_id: - warnings.append(f"Therapy Normalizer unable to normalize: " - f"{therapy}") - return normalized_therapy_id - - def get_normalized_disease(self, disease: str, - warnings: List[str]) -> Optional[str]: - """Get normalized disease concept. + response: Dict = { + "query": { + "variation": None, + "disease": None, + "therapy": None, + "gene": None, + "study_id": None + }, + "warnings": [], + "study_ids": [], + "studies": [], + "service_meta_": ServiceMeta() + } - :param str disease: Disease query - :param List[str] warnings: A list of warnings for the search query - :return: A normalized disease concept if it exists - """ - _, normalized_disease_id = \ - self.vicc_normalizers.normalize_disease([disease]) + normalized_terms = await self._get_normalized_terms( + variation, disease, therapy, gene, study_id, response) - if not normalized_disease_id: - warnings.append(f"Disease Normalizer unable to normalize: " - f"{disease}") - return normalized_disease_id + if normalized_terms is None: + return SearchStudiesService(**response) - async def get_normalized_variation(self, variation: str, - warnings: List[str]) -> Optional[str]: - """Get normalized variation concept. + (normalized_variation, normalized_disease, + normalized_therapy, normalized_gene, study, + valid_study_id) = normalized_terms - :param str variation: Variation query - :param List[str] warnings: A list of warnings for the search query - :return: A normalized variant concept if it exists - """ - variant_norm_resp = \ - await self.vicc_normalizers.normalize_variation([variation]) - normalized_variation = None - if variant_norm_resp: - normalized_variation = variant_norm_resp.variation_id - if not normalized_variation: - # Check if VRS variation (allele, cnv, or haplotype) - if variation.startswith(("ga4gh:VA.", "ga4gh:CNV.", "ga4gh:VH.")): - normalized_variation = variation + with self.driver.session() as session: + if valid_study_id: + study_nodes = [study] + response["study_ids"].append(study["id"]) else: - warnings.append(f"Variant Normalizer unable to normalize: " - f"{variation}") - return normalized_variation + study_nodes = self._get_related_studies( + session, + normalized_variation=normalized_variation, + normalized_therapy=normalized_therapy, + normalized_disease=normalized_disease, + normalized_gene=normalized_gene + ) + response["study_ids"] = [s["id"] for s in study_nodes] - def get_normalized_gene(self, gene: str, - warnings: List[str]) -> Optional[str]: - """Get normalized gene concept. + response["studies"] = self._get_nested_studies(session, study_nodes) - :param str gene: Gene query - :param List[str] warnings: A list of warnings for the search query. - :return: A normalized gene concept if it exists - """ - _, normalized_gene_id = self.vicc_normalizers.normalize_gene([gene]) - if not normalized_gene_id: - warnings.append(f"Gene Normalizer unable to normalize: {gene}") - return normalized_gene_id + if not response["studies"]: + response["warnings"].append( + "No studies found with the provided query parameters." + ) + + return SearchStudiesService(**response) - async def get_normalized_terms( + async def _get_normalized_terms( self, variation: Optional[str], disease: Optional[str], therapy: Optional[str], gene: Optional[str], - statement_id: Optional[str], response: Dict + study_id: Optional[str], response: Dict ) -> Optional[Tuple]: """Find normalized terms for queried concepts. - :param Optional[str] variation: Variation (subject) query - :param Optional[str] disease: Disease (object_qualifier) query - :param Optional[str] therapy: Therapy (object) query - :param Optional[str] gene: Gene query - :param Optional[str] statement_id: Statement ID query - :param Dict response: The response for the query + :param variation: Variation (subject) query + :param disease: Disease (object_qualifier) query + :param therapy: Therapy (object) query + :param gene: Gene query + :param study_id: Study ID query + :param response: The response for the query :return: A tuple containing the normalized concepts """ - if not (variation or disease or therapy or gene or statement_id): - response["warnings"].append("No parameters were entered.") + if not any((variation, disease, therapy, gene, study_id)): + response["warnings"].append("No query parameters were provided.") return None # Find normalized terms using VICC normalizers if therapy: response["query"]["therapy"] = therapy normalized_therapy = \ - self.get_normalized_therapy(therapy.strip(), - response["warnings"]) + self._get_normalized_therapy(therapy.strip(), response["warnings"]) else: normalized_therapy = None if disease: response["query"]["disease"] = disease normalized_disease = \ - self.get_normalized_disease(disease.strip(), - response["warnings"]) + self._get_normalized_disease(disease.strip(), response["warnings"]) else: normalized_disease = None if variation: response["query"]["variation"] = variation normalized_variation = \ - await self.get_normalized_variation(variation, response["warnings"]) + await self._get_normalized_variation(variation, response["warnings"]) else: normalized_variation = None if gene: response["query"]["gene"] = gene - normalized_gene = self.get_normalized_gene(gene, - response["warnings"]) + normalized_gene = self._get_normalized_gene(gene, response["warnings"]) else: normalized_gene = None - # Check that queried statement_id is valid - valid_statement_id = None - statement = None - if statement_id: - response["query"]["statement_id"] = statement_id + # Check that queried study_id is valid + valid_study_id = None + study = None + if study_id: + response["query"]["study_id"] = study_id with self.driver.session() as session: - statement = session.read_transaction( - self._get_statement_by_id, statement_id - ) - if statement: - valid_statement_id = statement.get("id") + study = self._get_study_by_id(session, study_id) + if study: + valid_study_id = study.get("id") else: response["warnings"].append( - f"Statement: {statement_id} does not exist.") + f"Study: {study_id} does not exist.") # If queried concept is given check that it is normalized / valid if (variation and not normalized_variation) or \ (therapy and not normalized_therapy) or \ (disease and not normalized_disease) or \ (gene and not normalized_gene) or \ - (statement_id and not valid_statement_id): + (study_id and not valid_study_id): return None return (normalized_variation, normalized_disease, normalized_therapy, - normalized_gene, statement, valid_statement_id) + normalized_gene, study, valid_study_id) - async def search( - self, variation: Optional[str] = None, disease: Optional[str] = None, - therapy: Optional[str] = None, gene: Optional[str] = None, - statement_id: Optional[str] = None, detail: bool = False - ) -> Dict: - """Get statements and propositions from queried concepts. - - :param Optional[str] variation: Variation query - :param Optional[str] disease: Disease query - :param Optional[str] therapy: Therapy query - :param Optional[str] gene: Gene query - :param Optional[str] statement_id: Statement ID query - :param bool detail: Whether or not to display all descriptors, - methods, and documents - :return: A dictionary containing the statements and propositions - with relationships to the queried concepts - """ - response: Dict = { - "query": { - "variation": None, - "disease": None, - "therapy": None, - "gene": None, - "statement_id": None, - "detail": detail - }, - "warnings": [], - "matches": { - "statements": [], - "propositions": [] - }, - "statements": [], # All Statements - "propositions": [], # All propositions - "variation_descriptors": [], - "gene_descriptors": [], - "therapy_descriptors": [], - "disease_descriptors": [], - "methods": [], - "documents": [], - "service_meta_": ServiceMeta().dict() - } - - normalized_terms = await self.get_normalized_terms( - variation, disease, therapy, gene, statement_id, response) - if normalized_terms is None: - return SearchService(**response).dict() - (normalized_variation, normalized_disease, - normalized_therapy, normalized_gene, statement, - valid_statement_id) = normalized_terms - - session = self.driver.session() - proposition_nodes = session.read_transaction( - self._get_propositions, valid_statement_id, normalized_variation, - normalized_therapy, normalized_disease, normalized_gene, - ) - - if not valid_statement_id: - # If statement ID isn't specified, get all statements - # related to a proposition - statement_nodes = list() - for p_node in proposition_nodes: - p_id = p_node.get("id") - if p_id not in response["matches"]["propositions"]: - response["matches"]["propositions"].append(p_id) - statements = session.read_transaction( - self._get_statements_from_proposition, p_id - ) - for s in statements: - statement_nodes.append(s) - s_id = s.get("id") - if s_id not in response["matches"]["statements"]: - response["matches"]["statements"].append(s_id) - else: - # Given Statement ID - statement_nodes = [statement] - s_id = statement.get("id") - response["matches"]["statements"].append(s_id) - - for p in proposition_nodes: - p_id = p.get("id") - if p_id not in response["matches"]["propositions"]: - response["matches"]["propositions"].append(p_id) - - # Add statements found in `supported_by` to statement_nodes - # Then add the associated proposition to proposition_nodes - for s in statement_nodes: - self.add_proposition_and_statement_nodes( - session, s.get("id"), proposition_nodes, statement_nodes - ) - - if proposition_nodes and statement_nodes: - response["statements"] = \ - self.get_statement_response(statement_nodes) - response["propositions"] = \ - self.get_propositions_response(proposition_nodes) - else: - response["warnings"].append("Could not find statements " - "associated with the queried" - " concepts.") - - if detail: - for s in response["statements"]: - vd = self._get_variation_descriptor( - response, - session.read_transaction( - self._find_node_by_id, - s["variation_descriptor"] - ) - ) - if vd not in response["variation_descriptors"]: - response["variation_descriptors"].append(vd) - if "therapy_descriptor" in s.keys(): - td = self._get_therapy_descriptor( - session.read_transaction( - self._find_node_by_id, s["therapy_descriptor"] - ) - ) - if td not in response["therapy_descriptors"]: - response["therapy_descriptors"].append(td) - else: - response["therapy_descriptors"] = [] - - dd = self._get_disease_descriptor( - session.read_transaction( - self._find_node_by_id, s["disease_descriptor"] - ) - ) - if dd not in response["disease_descriptors"]: - response["disease_descriptors"].append(dd) - - m = self._get_method( - session.read_transaction( - self._find_node_by_id, s["method"] - ) - ) - if m not in response["methods"]: - response["methods"].append(m) - - # Sometimes CIViC AIDs have supported by statements - # that we aren't able to transform - sb_not_found = set() - for sb_id in s["supported_by"]: - try: - document = self._get_document( - session.read_transaction( - self._find_node_by_id, sb_id - ) - ) - if document: - if document not in response["documents"]: - response["documents"].append(document) - except ValueError: - sb_not_found.add(sb_id) - if sb_not_found: - response["warnings"].append(f"Supported by evidence not " - f"yet supported in MetaKB: " - f"{sb_not_found} for " - f"{s['id']}") - else: - response["variation_descriptors"] = None - response["gene_descriptors"] = None - response["disease_descriptors"] = None - response["therapy_descriptors"] = None - response["methods"] = None - response["documents"] = None - - session.close() - return SearchService(**response).dict(by_alias=True, exclude_none=True) - - def search_by_id(self, node_id: str) -> Dict: - """Get node information given id query - - :param str node_id: Node's ID query - :return: A dictionary containing the node content - """ - valid_node_id = None - response = { - "query": node_id, - "warnings": [], - "service_meta_": ServiceMeta().dict() - } + def _get_normalized_therapy(self, therapy: str, + warnings: List[str]) -> Optional[str]: + """Get normalized therapy concept. - if not node_id: - response["warnings"].append("No parameters were entered.") - elif node_id.strip() == "": - response["warnings"].append("Cannot enter empty string.") - else: - node_id = node_id.strip() - if "%" not in node_id and ":" in node_id: - concept_name = quote(node_id.split(":", 1)[1]) - node_id = \ - f"{node_id.split(':', 1)[0]}" \ - f":{concept_name}" - with self.driver.session() as session: - node = session.read_transaction( - self._find_node_by_id, node_id - ) - if node: - valid_node_id = node.get("id") - else: - response["warnings"].append(f"Node: {node_id} " - f"does not exist.") - if (not node_id and not valid_node_id) or \ - (node_id and not valid_node_id): - return SearchIDService(**response).dict(exclude_none=True) - - label, *_ = node.labels - if label == "Statement": - statement = self._get_statement(node) - if statement: - response["statement"] = statement - elif label in ["Proposition", "TherapeuticResponse", - "Prognostic", "Diagnostic"]: - proposition = self._get_proposition(node) - if proposition: - response["proposition"] = proposition - elif label == "VariationDescriptor": - response["variation_descriptor"] = \ - self._get_variation_descriptor(response, node) - elif label == "TherapyDescriptor": - response["therapy_descriptor"] = \ - self._get_therapy_descriptor(node) - elif label == "DiseaseDescriptor": - response["disease_descriptor"] = self._get_disease_descriptor(node) - elif label == "GeneDescriptor": - response["gene_descriptor"] = \ - self._get_gene_descriptor(node, self._get_gene_value_object(node)) # noqa: E501 - elif label == "Document": - document = self._get_document(node) - if document: - response["document"] = document - elif label == "Method": - response["method"] = self._get_method(node) - - session.close() - return SearchIDService(**response).dict( - by_alias=True, exclude_none=True) - - async def search_statements( - self, variation: Optional[str] = None, - disease: Optional[str] = None, therapy: Optional[str] = None, - gene: Optional[str] = None, statement_id: Optional[str] = None - ) -> Dict: - """Get nested statements from queried concepts - - :param Optional[str] variation: Variation query - :param Optional[str] disease: Disease query - :param Optional[str] therapy: Therapy query - :param Optional[str] gene: Gene query - :param Optional[str] statement_id: Statement ID query - :return: A dictionary containing the statements with nested - propositions, descriptors, methods, and supported by documents + :param therapy: Therapy query + :param warnings: A list of warnings for the search query + :return: A normalized therapy concept if it exists """ - response: Dict = { - "query": { - "variation": None, - "disease": None, - "therapy": None, - "gene": None, - "statement_id": None - }, - "warnings": [], - "matches": { - "statements": [], - "propositions": [] - }, - "statements": [], - "service_meta_": ServiceMeta().dict() - } - - normalized_terms = await self.get_normalized_terms( - variation, disease, therapy, gene, statement_id, response) - if normalized_terms is None: - return SearchStatementsService(**response).dict() - (normalized_variation, normalized_disease, - normalized_therapy, normalized_gene, statement, - valid_statement_id) = normalized_terms - - session = self.driver.session() - statement_nodes = list() - proposition_nodes = session.read_transaction( - self._get_propositions, valid_statement_id, normalized_variation, - normalized_therapy, normalized_disease, normalized_gene - ) - - proposition_cache = dict() - if not valid_statement_id: - # If statement ID isn"t specified, get all statements - # related to a proposition - for p_node in proposition_nodes: - p_id = p_node.get("id") - if p_id not in response["matches"]["propositions"]: - response["matches"]["propositions"].append(p_id) - self._add_to_proposition_cache( - session, p_node, proposition_cache) - statements = session.read_transaction( - self._get_statements_from_proposition, p_node.get("id") - ) - for s in statements: - statement_nodes.append(s) - s_id = s.get("id") - if s_id not in response["matches"]["statements"]: - response["matches"]["statements"].append(s_id) - else: - # Given Statement ID - statement_nodes.append(statement) - s_id = statement.get("id") - p_node = proposition_nodes[0] - p_id = p_node.get("id") - if s_id not in response["matches"]["statements"]: - response["matches"]["statements"].append(statement_id) - if p_id not in response["matches"]["propositions"]: - response["matches"]["propositions"].append(p_id) - self._add_to_proposition_cache(session, p_node, proposition_cache) - - # Add statements found in `supported_by` to statement_nodes - # Then add the associated proposition to proposition_nodes - og_prop_nodes_len = len(proposition_nodes) - for s in statement_nodes: - self.add_proposition_and_statement_nodes( - session, s.get("id"), proposition_nodes, statement_nodes - ) - - if og_prop_nodes_len != len(proposition_nodes): - for p_node in proposition_nodes: - self._add_to_proposition_cache( - session, p_node, proposition_cache) + _, normalized_therapy_id = \ + self.vicc_normalizers.normalize_therapy([therapy]) - methods_cache: Dict = dict() - variations_cache: Dict = dict() - disease_cache: Dict = dict() - therapy_cache: Dict = dict() - document_cache: Dict = dict() - added_statements = set() + if not normalized_therapy_id: + warnings.append(f"Therapy Normalizer unable to normalize: " + f"{therapy}") + return normalized_therapy_id - for s in statement_nodes: - s_id = s.get("id") - if s_id in added_statements: - continue + def _get_normalized_disease(self, disease: str, + warnings: List[str]) -> Optional[str]: + """Get normalized disease concept. - statement_resp = session.read_transaction( - self._find_and_return_statement_response, s_id - ) - p_id = statement_resp.get("p_id") - proposition = proposition_cache[p_id] + :param disease: Disease query + :param warnings: A list of warnings for the search query + :return: A normalized disease concept if it exists + """ + _, normalized_disease_id = \ + self.vicc_normalizers.normalize_disease([disease]) - method_id = statement_resp["m"]["id"] - if method_id in methods_cache: - method = methods_cache[method_id] - else: - method = self.search_by_id(method_id)["method"] - methods_cache[method_id] = method + if not normalized_disease_id: + warnings.append(f"Disease Normalizer unable to normalize: " + f"{disease}") + return normalized_disease_id - variation_id = statement_resp["vid"] - if variation_id in variations_cache: - variation_descr = variations_cache[variation_id] - else: - variation_descr = self._get_variation_descriptor( - {}, - session.read_transaction( - self._find_node_by_id, variation_id), - gene_context_by_id=False - ) - variations_cache[variation_id] = variation_descr + async def _get_normalized_variation(self, variation: str, + warnings: List[str]) -> Optional[str]: + """Get normalized variation concept. - if proposition.type == PropositionType.PREDICTIVE: - therapy_id = statement_resp.get("tid") - if therapy_id in therapy_cache: - therapy_descr = therapy_cache[therapy_id] - else: - therapy_descr = self._get_therapy_descriptor( - session.read_transaction(self._find_node_by_id, - therapy_id) - ) - therapy_cache[therapy_id] = therapy_descr - else: - therapy_descr = None + :param variation: Variation query + :param warnings: A list of warnings for the search query + :return: A normalized variant concept if it exists + """ + variant_norm_resp = \ + await self.vicc_normalizers.normalize_variation([variation]) + normalized_variation = variant_norm_resp.id if variant_norm_resp else None - disease_id = statement_resp.get("did") - if disease_id in disease_cache: - disease_descr = disease_cache[disease_id] + if not normalized_variation: + # Check if VRS variation (allele, copy number change, copy number count) + if variation.startswith(("ga4gh:VA.", "ga4gh:CX.", "ga4gh:CN.")): + normalized_variation = variation else: - disease_descr = self._get_disease_descriptor( - session.read_transaction(self._find_node_by_id, - disease_id) - ) - disease_cache[disease_id] = disease_descr + warnings.append(f"Variation Normalizer unable to normalize: " + f"{variation}") + return normalized_variation - supported_by = list() - sb_not_found = set() - sb_list = session.read_transaction( - self._find_and_return_supported_by, s_id - ) - for sb in sb_list: - sb_id = sb.get("id") - try: - if sb_id in document_cache: - document = document_cache[sb_id] - else: - document = self._get_document( - session.read_transaction( - self._find_node_by_id, sb_id - ) - ) + def _get_normalized_gene(self, gene: str, warnings: List[str]) -> Optional[str]: + """Get normalized gene concept. - if document: - supported_by.append(document) - document_cache[sb_id] = document - else: - if sb_id.startswith("civic.eid"): - supported_by.append(sb_id) - except ValueError: - sb_not_found.add(sb_id) - if sb_not_found: - response["warnings"].append(f"Supported by evidence not " - f"yet supported in MetaKB: " - f"{sb_not_found} for " - f"{s['id']}") - - params = { - "id": s_id, - "description": s.get("description"), - "direction": s.get("direction"), - "evidence_level": s.get("evidence_level"), - "variation_origin": s.get("variation_origin"), - "proposition": proposition, - "variation_descriptor": variation_descr, - "therapy_descriptor": therapy_descr, - "disease_descriptor": disease_descr, - "method": method, - "supported_by": supported_by - } - response["statements"].append( - NestedStatementResponse(**params).dict()) - added_statements.add(s_id) - session.close() - return SearchStatementsService(**response).dict( - by_alias=True, exclude_none=True) - - def _add_to_proposition_cache(self, session: Session, p_node: Node, - proposition_cache: Dict) -> None: - """Add a proposition to `proposition_cache` - - :param Session session: Neo4j driver session - :param Node p_node: Proposition node - :param Dict proposition_cache: Proposition lookup dictionary - """ - p_id = p_node.get("id") - if p_id not in proposition_cache: - proposition_resp = session.read_transaction( - self._find_and_return_proposition_response, - p_id - ) - proposition_type = p_node.get("type") - proposition = { - "id": p_id, - "type": proposition_type, - "predicate": p_node.get("predicate"), - "subject": proposition_resp["subject"], - "object_qualifier": proposition_resp["object_qualifier"] - } - if proposition_type == PropositionType.PREDICTIVE: - proposition["object"] = proposition_resp["object"] - proposition = \ - TherapeuticResponseProposition(**proposition) - elif proposition_type == PropositionType.PROGNOSTIC: - proposition = PrognosticProposition(**proposition) - elif proposition_type == PropositionType.DIAGNOSTIC: - proposition = DiagnosticProposition(**proposition) - else: - raise ValueError(f"{proposition_type} is not a valid " - f"proposition type") - if proposition: - proposition_cache[p_id] = proposition - - def _get_variation_descriptor( - self, response: Dict, variation_descriptor: Node, - gene_context_by_id: bool = True) -> VariationDescriptor: - """Get variation descriptor - - :param Dict response: Query response object - :param variation_descriptor: Variation Descriptor Node - :param bool gene_context_by_id: `True` if gene_context field will be - a CURIE that reference's a gene descriptor. `False` if gene_context - field will be a gene descriptor - :return: Variation descriptor + :param gene: Gene query + :param warnings: A list of warnings for the search query. + :return: A normalized gene concept if it exists """ - keys = variation_descriptor.keys() - vid = variation_descriptor.get("id") - vd_params = { - "id": vid, - "label": variation_descriptor.get("label"), - "description": variation_descriptor.get("description"), - "variation_id": None, - "variation": None, - "gene_context": None, - "molecule_context": variation_descriptor.get("molecule_context"), - "structural_type": variation_descriptor.get("structural_type"), - "vrs_ref_allele_seq": variation_descriptor.get("vrs_ref_allele_seq"), # noqa: E501 - "expressions": [], - "xrefs": variation_descriptor.get("xrefs"), - "alternate_labels": variation_descriptor.get("alternate_labels"), - "extensions": [] - } - - # Get Gene Descriptor / gene context - with self.driver.session() as session: - gene_descriptor = session.read_transaction( - self._get_variation_descriptors_gene, vd_params["id"] - ) - gene_descriptor_id = gene_descriptor.get("id") - - gene_value_object = session.read_transaction( - self._find_descriptor_value_object, - gene_descriptor_id - ) - gene_context = self._get_gene_descriptor( - gene_descriptor, gene_value_object) - - if gene_context_by_id: - # Reference gene descriptor by id - vd_params["gene_context"] = gene_descriptor_id - else: - # gene context will be gene descriptor - vd_params["gene_context"] = gene_context - - if "gene_descriptors" in response and\ - gene_descriptor_id not in response["gene_descriptors"]: - response["gene_descriptors"].append(gene_context) - - # Get Variation Descriptor Expressions - for key in ["expressions_g", "expressions_p", - "expressions_c"]: - if key in keys: - for value in variation_descriptor.get(key): - vd_params["expressions"].append( - Expression( - syntax=f"hgvs.{key.split('_')[-1]}", - value=value - ).dict() - ) - if not vd_params["expressions"]: - del vd_params["expressions"] - - # Get Variation Descriptor Extensions - if vd_params["id"].startswith("civic.vid"): - for field in ["civic_representative_coordinate", - "civic_actionability_score"]: - if field in keys: - vd_params["extensions"].append( - Extension( - name=field, - value=json.loads(variation_descriptor.get(field)) - ).dict() - ) - with self.driver.session() as session: - variant_group = session.read_transaction( - self._get_variation_group, vid - ) - if variant_group: - variant_group = variant_group[0] - vg = Extension( - name="variant_group", - value=[{ - "id": variant_group.get("id"), - "label": variant_group.get("label"), - "description": variant_group.get("description"), - "type": "variant_group" - }] - ).dict() - for v in vg["value"]: - if not v["description"]: - del v["description"] - vd_params["extensions"].append(vg) - elif vd_params["id"].startswith("moa.variant"): - for field in ["moa_representative_coordinate", "moa_rsid"]: - if field in keys: - vd_params["extensions"].append( - Extension( - name=field, - value=json.loads(variation_descriptor.get(field)) - ).dict() - ) - - with self.driver.session() as session: - value_object = session.read_transaction( - self._find_descriptor_value_object, vd_params["id"] - ) - vd_params["variation_id"] = value_object.get("id") - vd_params["variation"] = json.loads(value_object["variation"]) - return VariationDescriptor(**vd_params) + _, normalized_gene_id = self.vicc_normalizers.normalize_gene([gene]) + if not normalized_gene_id: + warnings.append(f"Gene Normalizer unable to normalize: {gene}") + return normalized_gene_id @staticmethod - def _get_variation_group(tx: Transaction, vid: str) -> Optional[Record]: - """Get a variation descriptor's variation group. + def _get_study_by_id(tx: Transaction, study_id: str) -> Optional[Node]: + """Get a Study node by ID. - :param Transaction tx: Neo4j session transaction - :param str vid: variation descriptor ID - :return: query record, containing variation group node if successful + :param tx: Neo4j session transaction object + :param study_id: Study ID to retrieve + :return: Study node if successful """ - query = ( - "MATCH (vd:VariationDescriptor)-[:IN_VARIATION_GROUP]->(vg:VariationGroup) " # noqa: E501 - f"WHERE toLower(vd.id) = toLower('{vid}') " - "RETURN vg" - ) - return tx.run(query).single() - - @staticmethod - def _get_variation_descriptors_gene(tx: Transaction, - vid: str) -> Optional[Node]: - """Get a Variation Descriptor's Gene Descriptor. - :param Transaction tx: Neo4j session transaction - :param str vid: variation descriptor ID - :return: Gene descriptor Node if successful + query = f""" + MATCH (s:Study) + WHERE toLower(s.id) = toLower('{study_id}') + RETURN s """ - query = ( - "MATCH (vd:VariationDescriptor)-[:HAS_GENE]->(gd:GeneDescriptor) " - f"WHERE toLower(vd.id) = toLower('{vid}') " - "RETURN gd" - ) - return tx.run(query).single()[0] + return (tx.run(query).single() or [None])[0] @staticmethod - def _get_gene_descriptor(gene_descriptor: Node, - gene_value_object: Node) -> GeneDescriptor: - """Add gene descriptor to response. + def _get_related_studies( + tx: Transaction, + normalized_variation: Optional[str] = None, + normalized_therapy: Optional[str] = None, + normalized_disease: Optional[str] = None, + normalized_gene: Optional[str] = None + ) -> List[Node]: + """Get studies that contain queried normalized concepts. - :param Node gene_descriptor: Gene Descriptor Node - :param Node gene_value_object: Gene Node - :return: GeneDescriptor object + :param tx: Neo4j session transaction object + :param normalized_variation: VRS Variation ID + :param normalized_therapy: normalized therapy concept ID + :param normalized_disease: normalized disease concept ID + :param normalized_gene: normalized gene concept ID + :return: List of Study nodes matching given parameters """ - gd_params = { - "id": gene_descriptor.get("id"), - "type": "GeneDescriptor", - "label": gene_descriptor.get("label"), - "description": gene_descriptor.get("description"), - "gene_id": gene_value_object.get("id"), - "alternate_labels": gene_descriptor.get("alternate_labels"), - "xrefs": gene_descriptor.get("xrefs") - } + query = "MATCH (s:Study)" + params: Dict[str, str] = {} - return GeneDescriptor(**gd_params) + if normalized_variation: + query += """ + MATCH (s) -[:HAS_VARIANT] -> (cv:CategoricalVariation) + MATCH (cv) -[:HAS_DEFINING_CONTEXT|:HAS_MEMBERS] -> (v:Variation {id:$v_id}) + """ + params["v_id"] = normalized_variation - def _get_therapy_descriptor( - self, therapy_descriptor: Node - ) -> ValueObjectDescriptor: - """Get therapy descriptor. + if normalized_disease: + query += """ + MATCH (s) -[:HAS_TUMOR_TYPE] -> (c:Condition {disease_normalizer_id:$c_id}) + """ + params["c_id"] = normalized_disease - :param Node therapy_descriptor: Therapy Descriptor Node - :return: Value Object Descriptor for therapy - """ - td_params = { - "id": therapy_descriptor.get("id"), - "type": "TherapyDescriptor", - "label": therapy_descriptor.get("label"), - "therapy_id": None, - "alternate_labels": therapy_descriptor.get("alternate_labels"), - "xrefs": therapy_descriptor.get("xrefs"), - "extensions": [] - } + if normalized_gene: + query += """ + MATCH (s) -[:HAS_GENE_CONTEXT] -> (g:Gene {gene_normalizer_id:$g_id}) + """ + params["g_id"] = normalized_gene - key = "regulatory_approval" - val = therapy_descriptor.get(key) - if val: - td_params["extensions"].append(Extension(name=key, value=json.loads(val))) + if normalized_therapy: + query += """ + MATCH (s1:Study) -[:HAS_THERAPEUTIC] ->( + tp:TherapeuticAgent {therapy_normalizer_id:$t_id}) + RETURN s1 as s + UNION + MATCH (s2:Study) -[:HAS_THERAPEUTIC]-> () - [:HAS_SUBSTITUTES| + HAS_COMPONENTS] ->(ta:TherapeuticAgent {therapy_normalizer_id:$t_id}) + RETURN s2 as s + """ + params["t_id"] = normalized_therapy else: - del td_params["extensions"] + query += "RETURN s" - with self.driver.session() as session: - value_object = session.read_transaction( - self._find_descriptor_value_object, td_params["id"] - ) - td_params["therapy_id"] = value_object.get("id") - - return ValueObjectDescriptor(**td_params) + return [s[0] for s in tx.run(query, **params)] - def _get_disease_descriptor( - self, disease_descriptor: Node - ) -> ValueObjectDescriptor: - """Get disease descriptor. + def _get_nested_studies( + self, + tx: Transaction, + study_nodes: List[Node] + ) -> List[Dict]: + """Get a list of nested studies. - :param Node disease_descriptor: Disease Descriptor Node - :return: Value Object Descriptor for disease + :param tx: Neo4j session transaction object + :param study_nodes: A list of Study Nodes + :return: A list of nested studies """ - dd_params = { - "id": disease_descriptor.get("id"), - "type": "DiseaseDescriptor", - "label": disease_descriptor.get("label"), - "disease_id": None, - "xrefs": disease_descriptor.get("xrefs") - } - - with self.driver.session() as session: - value_object = session.read_transaction( - self._find_descriptor_value_object, dd_params["id"] - ) - dd_params["disease_id"] = value_object.get("id") + nested_studies = [] + added_studies = set() + for s in study_nodes: + s_id = s.get("id") + if s_id not in added_studies: + try: + nested_study = self._get_nested_study(tx, s) + except ValidationError as e: + logger.warning("%s: %s", s_id, e) + else: + if nested_study: + nested_studies.append(nested_study) + added_studies.add(s_id) - return ValueObjectDescriptor(**dd_params) + return nested_studies - @staticmethod - def _get_method(method: Node) -> Method: - """Get method + def _get_nested_study(self, tx: Transaction, s: Node) -> Dict: + """Get information related to a study + Only VariantTherapeuticResponseStudy are supported at the moment - :param Node method: Method Node - :return: Method + :param tx: Neo4j session transaction object + :param Node s: Study Node + :return: Nested study """ - params = dict() - for key in method.keys(): - try: - params[key] = json.loads(method.get(key)) - except JSONDecodeError: - params[key] = method.get(key) - - return Method(**params) - - @staticmethod - def _get_document(document: Node) -> Optional[Document]: - """Add document to response. + if s["type"] != "VariantTherapeuticResponseStudy": + return {} - :param Node document: Document Node - :return: Document if node has type `Document` - """ - label, *_ = document.labels - if label != "Document": - return None + params = { + "tumorType": None, + "variant": None, + "strength": None, + "isReportedIn": [], + "specifiedBy": None + } + params.update(s) + study_id = s["id"] + + # Get relationship and nodes for a study + query = f""" + MATCH (s:Study {{ id:'{study_id}' }}) + OPTIONAL MATCH (s)-[r]-(n) + RETURN type(r) as r_type, n; + """ + nodes_and_rels = tx.run(query).data() + + for item in nodes_and_rels: + rel_type = item["r_type"] + node = item["n"] + + if rel_type == "HAS_TUMOR_TYPE": + params["tumorType"] = self._get_disease(node) + elif rel_type == "HAS_VARIANT": + params["variant"] = self._get_cat_var(tx, node) + elif rel_type == "HAS_GENE_CONTEXT": + params["qualifiers"] = self._get_variant_onco_study_qualifier( + tx, study_id, s.get("alleleOrigin") + ) + elif rel_type == "IS_SPECIFIED_BY": + node["isReportedIn"] = self._get_method_document(tx, node["id"]) + params["specifiedBy"] = Method(**node) + elif rel_type == "IS_REPORTED_IN": + params["isReportedIn"].append(self._get_document(node)) + elif rel_type == "HAS_STRENGTH": + params["strength"] = core_models.Coding(**node) + elif rel_type == "HAS_THERAPEUTIC": + params["therapeutic"] = self._get_therapeutic_procedure(tx, node) + else: + logger.warning("relation type not supported: %s", rel_type) - params = dict() - for key in document.keys(): - params[key] = document.get(key) - return Document(**params) + return VariantTherapeuticResponseStudy(**params).model_dump() @staticmethod - def _find_node_by_id(tx: Transaction, node_id: str) -> Optional[Node]: - """Find a node by its ID. - :param Transaction tx: Neo4j session transaction object - :param str node_id: ID of node to retrieve - :return: Node object if successful - """ - query = ( - "MATCH (n) " - f"WHERE toLower(n.id) = toLower('{node_id}') " - "RETURN n" - ) - return (tx.run(query).single() or [None])[0] + def _get_disease(node: Dict) -> core_models.Disease: + """Get disease data from a node with relationship ``HAS_TUMOR_TYPE`` - @staticmethod - def _find_descriptor_value_object(tx: Transaction, - descriptor_id: str) -> Optional[Node]: - """Find a Descriptor's value object. - :param Transaction tx: Neo4j session transaction object - :param str descriptor_id: ID of descriptor to look up - :return: Node of value object described by descriptor if successful - """ - query = ( - "MATCH (d)-[:DESCRIBES]->(v)" - f"WHERE toLower(d.id) = toLower('{descriptor_id}') " - "RETURN v" - ) - return tx.run(query).single()[0] - - def add_proposition_and_statement_nodes( - self, session, statement_id: str, proposition_nodes: List, - statement_nodes: List): - """Get statements found in `supported_by` and their propositions - and add to corresponding list. - - :param session: Session - :param str statement_id: Statement ID - :param List proposition_nodes: List of propositions - :param List statement_nodes: List of statements + :param node: Disease node data. This will be mutated. + :return: Disease data """ - supported_by_statements = session.read_transaction( - self._find_and_return_supported_by, statement_id, - only_statement=True - ) - for s in supported_by_statements: - if s not in statement_nodes: - statement_nodes.append(s) - proposition = session.read_transaction( - self._find_and_return_propositions_from_statement, - s.get("id") + _update_mappings(node) + node["extensions"] = [ + core_models.Extension( + name="disease_normalizer_id", + value=node["disease_normalizer_id"] + ) + ] + return core_models.Disease(**node) + + def _get_cat_var(self, tx: Transaction, node: Dict) -> CategoricalVariation: + """Get categorical variation data from a node with relationship ``HAS_VARIANT`` + + :param tx: Neo4j session transaction object + :param node: Variant node data. This will be mutated. + :return: Categorical Variation data + """ + _update_mappings(node) + + extensions = [] + for node_key, ext_name in ( + ("moa_representative_coordinate", "MOA representative coordinate"), + ("civic_representative_coordinate", "CIViC representative coordinate"), + ("civic_molecular_profile_score", "CIViC Molecular Profile Score"), + ("variant_types", "Variant types") + ): + node_val = node.get(node_key) + if node_val: + try: + ext_val = json.loads(node_val) + except TypeError: + ext_val = node_val + extensions.append( + core_models.Extension( + name=ext_name, + value=ext_val + ) ) - if proposition and proposition \ - not in proposition_nodes: - proposition_nodes.append(proposition) - - @staticmethod - def _get_statement_by_id(tx: Transaction, - statement_id: str) -> Optional[Node]: - """Get a Statement node by ID. - - :param Transaction tx: Neo4j session transaction object - :param str statement_id: statemend ID to retrieve - :return: statement node if successful - """ - query = ( - "MATCH (s:Statement) " - f"WHERE toLower(s.id) = toLower('{statement_id}') " - "RETURN s" + if node_key.startswith(SourceName.MOA.value): + # Cant be civic + break + + node["extensions"] = extensions or None + node["definingContext"] = self._get_variations( + tx, node["id"], VariationRelation.HAS_DEFINING_CONTEXT + )[0] + node["members"] = self._get_variations( + tx, node["id"], VariationRelation.HAS_MEMBERS ) - return (tx.run(query).single() or [None])[0] + return CategoricalVariation(**node) @staticmethod - def _get_propositions( - tx: Transaction, - statement_id: str = "", - normalized_variation: str = "", - normalized_therapy: str = "", - normalized_disease: str = "", - normalized_gene: str = "", - prop_type: Optional[PropositionType] = None, - pred: Optional[Predicate] = None - ) -> List[Node]: - """Get propositions that contain normalized concepts queried. Used - as callback for Neo4j session API. - - :param Transaction tx: Neo4j session transaction object - :param str statement_id: statement ID as stored in DB - :param str normalized_variation: variation VRS ID - :param str normalized_therapy: normalized therapy concept ID - :param str normalized_disease: normalized disease concept ID - :param str normalized_gene: normalized gene concept ID - :param Optional[PropositionType] prop_type: type of proposition - :param Optional[Predicate] pred: predicate value - :returns: List of nodes matching given parameters - """ - query = "" - params: Dict[str, str] = {} - if prop_type and pred: - query += \ - "MATCH (p:Proposition {type:$prop_type, predicate:$pred}) " - params["prop_type"] = prop_type.value - params["pred"] = pred.value - elif prop_type: - query += "MATCH (p:Proposition {type:$prop_type}) " - params["prop_type"] = prop_type.value - elif pred: - query += "MATCH (p:Proposition {predicate:$pred}) " - params["pred"] = pred.value - if statement_id: - query += "MATCH (:Statement {id:$s_id})-[:DEFINED_BY]-> (p:Proposition) " # noqa: E501 - params["s_id"] = statement_id - if normalized_therapy: - query += \ - "MATCH (p:Proposition)<-[:IS_OBJECT_OF]-(:Therapy {id:$t_id}) " - params["t_id"] = normalized_therapy - if normalized_variation: - lower_normalized_variation = normalized_variation.lower() - query += "MATCH (p:Proposition)<-[:IS_SUBJECT_OF]-(v:Variation " - if lower_normalized_variation.startswith('ga4gh:sq.'): - # Sequence ID - query += "{location_sequence_id: $v_id}) " - else: - query += "{id:$v_id}) " - params["v_id"] = normalized_variation - if normalized_disease: - query += "MATCH (p:Proposition)<-[:IS_OBJECT_QUALIFIER_OF]-(:Disease {id:$d_id}) " # noqa: E501 - params["d_id"] = normalized_disease - if normalized_gene: - query += "MATCH (:Gene {id:$g_id})<-[:DESCRIBES]-" \ - "(:GeneDescriptor)<-[:HAS_GENE]-" \ - "(:VariationDescriptor)-[:DESCRIBES]->(v:Variation)-" \ - "[:IS_SUBJECT_OF]->(p:Proposition) " - params["g_id"] = normalized_gene - query += "RETURN DISTINCT p" - return [p[0] for p in tx.run(query, **params)] - - @staticmethod - def _get_statements_from_proposition(tx: Transaction, - proposition_id: str) -> List[Node]: - """Get statements that are defined by a proposition. - - :param Transaction tx: Neo4j session transaction object - :param str proposition_id: ID for proposition to retrieve associated - statements from - :return: List of statement Nodes - """ - query = ( - "MATCH (p:Proposition {id: $proposition_id})<-[:DEFINED_BY]-(s:Statement) " # noqa: E501 - "RETURN DISTINCT s" - ) - return [s[0] for s in tx.run(query, proposition_id=proposition_id)] + def _get_variations( + tx: Transaction, + cv_id: str, + relation: VariationRelation + ) -> List[Dict]: + """Get list of variations associated to categorical variation + + :param tx: Neo4j session transaction object + :param cv_id: ID for categorical variation + :param relation: Relation type for categorical variation and variation + :return: List of variations with `relation` to categorical variation. If + VariationRelation.HAS_MEMBERS, returns at least one variation. Otherwise, + returns exactly one variation + """ + query = f""" + MATCH (v:Variation) <- [:{relation.value}] - (cv:CategoricalVariation + {{ id: '{cv_id}' }}) + MATCH (loc:Location) <- [:HAS_LOCATION] - (v) + RETURN v, loc + """ + results = tx.run(query) + variations = [] + for r in results: + r_params = r.data() + v_params = r_params["v"] + expressions = [] + for variation_k, variation_v in v_params.items(): + if variation_k == "state": + v_params[variation_k] = json.loads(variation_v) + elif variation_k.startswith("expression_hgvs_"): + syntax = variation_k.split("expression_")[-1].replace("_", ".") + for hgvs_expr in variation_v: + expressions.append( + models.Expression( + syntax=syntax, + value=hgvs_expr + ) + ) - def get_statement_response(self, - statement_nodes: List[Node]) -> List[Dict]: - """Return a list of statements from Statement and Proposition nodes. + v_params["expressions"] = expressions or None + loc_params = r_params["loc"] + v_params["location"] = loc_params + v_params["location"]["sequenceReference"] = json.loads(loc_params["sequence_reference"]) # noqa: E501 + variations.append(models.Variation(**v_params).model_dump()) + return variations - :param List statement_nodes: A list of Statement Nodes - :return: A list of dicts containing statement response output - """ - statements_response = list() - added_statements = set() - for s in statement_nodes: - s_id = s.get("id") - if s_id not in added_statements: - statements_response.append( - self._get_statement(s) - ) - added_statements.add(s_id) + @staticmethod + def _get_variant_onco_study_qualifier( + tx: Transaction, + study_id: str, + allele_origin: Optional[str] + ) -> _VariantOncogenicityStudyQualifier: + """Get variant oncogenicity study qualifier data for a study + + :param tx: Neo4j session transaction object + :param study_id: ID of study node + :param allele_origin: Study's allele origin + :return Variant oncogenicity study qualifier data + """ + query = f""" + MATCH (s:Study {{ id: '{study_id}' }}) -[:HAS_GENE_CONTEXT] -> (g:Gene) + RETURN g + """ + record = tx.run(query).single() + if not record: + return None - return statements_response + gene_params = record.data()["g"] + _update_mappings(gene_params) - @staticmethod - def _find_and_return_statement_response( - tx: Transaction, statement_id: str - ) -> Optional[Record]: - """Return IDs and method related to a Statement. - :param Transaction tx: Neo4j session transaction object - :param str statement_id: ID of statement to retrieve - :return: Record containing descriptors, methods, and propositions - associated with statement if successful - """ - queries = ( - ("MATCH (s)-[r1]->(td:TherapyDescriptor) ", "td.id AS tid,"), - ("", "") - ) - for q in queries: - query = ( - "MATCH (s:Statement) " - f"WHERE s.id = '{statement_id}' " - f"{q[0]}" - "MATCH (s)-[r2]->(vd:VariationDescriptor) " - "MATCH (s)-[r3]->(dd:DiseaseDescriptor) " - "MATCH (s)-[r4]->(m:Method) " - "MATCH (s)-[r6]->(p:Proposition) " - f"RETURN {q[1]} vd.id AS vid, dd.id AS did, m," - " p.id AS p_id" + gene_params["extensions"] = [ + core_models.Extension( + name="gene_normalizer_id", + value=gene_params["gene_normalizer_id"] ) - result = tx.run(query).single() - if result: - return result - return None - - def get_propositions_response( - self, proposition_nodes: List[Node] - ) -> List[Proposition]: - """Return a list of propositions from Proposition nodes. - - :param List[Node] proposition_nodes: A list of Proposition Nodes - :return: A list of Propositions - """ - propositions_response = list() - for p in proposition_nodes: - proposition = self._get_proposition(p) - if proposition and proposition not in propositions_response: - propositions_response.append(proposition) - return propositions_response + ] - @staticmethod - def _find_and_return_proposition_response(tx: Transaction, - proposition_id: str) -> Record: - """Return value ids from a proposition.""" - queries = ( - ("MATCH (n) -[r1]-> (t:Therapy) ", "t.id AS object,"), ("", "") + return _VariantOncogenicityStudyQualifier( + alleleOrigin=allele_origin, + geneContext=core_models.Gene(**gene_params) ) - for q in queries: - query = ( - f"MATCH (n) " - f"WHERE n.id = '{proposition_id}' " - f"{q[0]}" - "MATCH (n) -[r2]-> (v:Variation) " - "MATCH (n) -[r3]-> (d:Disease) " - f"RETURN {q[1]} v.id AS subject, d.id AS object_qualifier" - ) - result = tx.run(query).single() - if result: - return result - return None @staticmethod - def _find_and_return_supported_by( - tx: Transaction, statement_id: str, only_statement: bool = False - ) -> List[Record]: - """Retrieve Statement and Document Nodes that support a given - Statement. - - :param Transaction tx: Neo4j session transaction object - :param str statement_id: ID of original statement - :param bool only_statement: `True` if only match on Statement, - `False` if match on both Statement and Document - :return: List of Records that support provided statement - """ - if not only_statement: - match = "MATCH (s:Statement)-[:CITES]->(sb) " - else: - match = "MATCH (s:Statement)-[:CITES]->(sb:Statement) " - query = ( - f"{match}" - f"WHERE s.id = '{statement_id}' " - "RETURN sb" - ) - return [se[0] for se in tx.run(query)] + def _get_method_document(tx: Transaction, method_id: str) -> Optional[Document]: + """Get document for a given method - @staticmethod - def _find_and_return_propositions_from_statement( - tx: Transaction, statement_id: str - ) -> Optional[Node]: - """Find propositions from a given statement. - - :param Transaction tx: Neo4j session transaction object - :param statement_id str: statement ID to get propositions for - :return: Node containing supported proposition if successful + :param tx: Neo4j session transaction object + :param method_id: ID for method + :return: Document """ - query = ( - "MATCH (p:Proposition)<-[:DEFINED_BY]-(s:Statement) " - f"WHERE toLower(s.id) = toLower('{statement_id}') " - "RETURN p" - ) - return (tx.run(query).single() or [None])[0] + query = f""" + MATCH (m:Method {{ id: '{method_id}' }}) -[:IS_REPORTED_IN] -> (d:Document) + RETURN d + """ + record = tx.run(query).single() + if not record: + return None - def _get_gene_value_object(self, node: Node) -> Node: - """Get gene value object from gene descriptor object + doc_params = record.data()["d"] + return Document(**doc_params) + + @staticmethod + def _get_document(node: Dict) -> Document: + """Get document data from a node with relationship ``IS_SPECIFIED_BY`` - :param Node node: gene descriptor object - :return: gene value object + :param node: Document node data. This will be mutated + :return: Document data """ - with self.driver.session() as session: - gene_value_object = session.read_transaction( - self._find_descriptor_value_object, node.get("id") - ) - return gene_value_object + _update_mappings(node) - def _get_proposition(self, p: Node) -> Proposition: - """Return a proposition. + source_type = node.get("source_type") + if source_type: + node["extensions"] = [ + core_models.Extension( + name="source_type", + value=source_type + ) + ] + return Document(**node) + + def _get_therapeutic_procedure( + self, + tx: Transaction, + node: Dict, + ) -> Optional[core_models.TherapeuticProcedure]: + """Get therapeutic procedure from a node with relationship ``HAS_THERAPEUTIC`` + + :param tx: Neo4j session transaction object + :param node: Therapeutic node data. This will be mutated. + :return: Therapeutic procedure if node type is supported. Currently, therapeutic + action is not supported. + """ + node_type = node["type"] + if node_type in {"CombinationTherapy", "TherapeuticSubstituteGroup"}: + civic_therapy_interaction_type = node.get("civic_therapy_interaction_type") + if civic_therapy_interaction_type: + node["extensions"] = [ + core_models.Extension( + name="civic_therapy_interaction_type", + value=civic_therapy_interaction_type + ) + ] - :param Node p: Proposition Node - :return: A proposition - :raise: ValueError if unrecognized proposition type - """ - with self.driver.session() as session: - p_id = p.get("id") - p_type = p.get("type") - proposition = None - value_ids = session.read_transaction( - self._find_and_return_proposition_response, p_id - ) - params = { - "id": p_id, - "type": p_type, - "predicate": p.get("predicate"), - "subject": value_ids["subject"], - "object_qualifier": value_ids["object_qualifier"] - } - if p_type == PropositionType.PREDICTIVE: - params["object"] = value_ids["object"] - proposition = \ - TherapeuticResponseProposition(**params) - elif p_type == PropositionType.PROGNOSTIC: - proposition = PrognosticProposition(**params) - elif p_type == PropositionType.DIAGNOSTIC: - proposition = DiagnosticProposition(**params) + if node_type == "CombinationTherapy": + node["components"] = self._get_therapeutic_agents( + tx, node["id"], TherapeuticProcedureType.COMBINATION, + TherapeuticRelation.HAS_COMPONENTS + ) else: - raise ValueError - return proposition - - def _get_statement(self, s: Node) -> Dict: - """Return a statement. + node["substitutes"] = self._get_therapeutic_agents( + tx, node["id"], TherapeuticProcedureType.SUBSTITUTES, + TherapeuticRelation.HAS_SUBSTITUTES + ) - :param Node s: Statement Node - :return: Dict containing values from s - """ - with self.driver.session() as session: - statement_id = s.get("id") - response = session.read_transaction( - self._find_and_return_statement_response, statement_id) - se_list = session.read_transaction( - self._find_and_return_supported_by, statement_id) - - statement = StatementResponse( - id=statement_id, - description=s.get("description"), - direction=s.get("direction"), - evidence_level=s.get("evidence_level"), - variation_origin=s.get("variation_origin"), - proposition=response["p_id"], - variation_descriptor=response["vid"], - therapy_descriptor=response["tid"] if "tid" in response.keys() else None, # noqa: E501 - disease_descriptor=response["did"], - method=response["m"]["id"], - supported_by=[se["id"] for se in se_list] - ).dict(exclude_none=True) - return statement + therapeutic = core_models.TherapeuticProcedure(**node) + elif node_type == "TherapeuticAgent": + therapeutic = self._get_therapeutic_agent(node) + else: + logger.warning("node type not supported: %s", node_type) + therapeutic = None + + return therapeutic + + def _get_therapeutic_agents( + self, + tx: Transaction, + tp_id: str, + tp_type: TherapeuticProcedureType, + tp_relation: TherapeuticRelation + ) -> List[core_models.TherapeuticAgent]: + """Get list of therapeutic agents for therapeutic combination or substitutes + group + + :param tp_id: ID for combination therapy or therapeutic substitute group + :param tp_type: Therapeutic Procedure type + :param tp_relation: Relationship type for therapeutic procedure and therapeutic + agent + :return: List of Therapeutic Agents for a combination therapy or therapeutic + substitute group + """ + query = f""" + MATCH (tp:{tp_type.value} {{ id: '{tp_id}' }}) -[:{tp_relation.value}] + -> (ta:TherapeuticAgent) + RETURN ta + """ + therapeutic_agents = [] + results = tx.run(query) + for r in results: + r_params = r.data() + ta_params = r_params["ta"] + ta = self._get_therapeutic_agent(ta_params) + therapeutic_agents.append(ta) + return therapeutic_agents @staticmethod - def _get_documents(tx: Transaction, **parameters) -> List[Node]: - """Get Document nodes matching provided parameters. Provide as callback - to Neo4j session methods. - :param Transaction tx: session transaction - :param Dict parameters: document properties and values - :return: List of nodes matching given parameters - :raise TypeError: if no parameters given - """ - params_strings = [] - if len(parameters) == 0: - raise TypeError("Must provide at least one parameter") - for key in parameters.keys(): - params_strings.append(f"{key}:${key}") - query_string = f""" - MATCH (d:Document {{ {','.join(params_strings)} }}) - RETURN d - """ - return [p[0] for p in tx.run(query_string, **parameters)] + def _get_therapeutic_agent(in_ta_params: Dict) -> core_models.TherapeuticAgent: + """Transform input parameters into TherapeuticAgent object + + :param in_ta_params: Therapeutic Agent node properties + :return: TherapeuticAgent + """ + ta_params = copy(in_ta_params) + _update_mappings(ta_params) + extensions = [ + core_models.Extension( + name="therapy_normalizer_id", + value=ta_params["therapy_normalizer_id"] + ) + ] + regulatory_approval = ta_params.get("regulatory_approval") + if regulatory_approval: + regulatory_approval = json.loads(regulatory_approval) + extensions.append( + core_models.Extension( + name="regulatory_approval", + value=regulatory_approval + ) + ) + + ta_params["extensions"] = extensions + return core_models.TherapeuticAgent(**ta_params) diff --git a/metakb/schemas/api.py b/metakb/schemas/api.py new file mode 100644 index 00000000..f80ea0fd --- /dev/null +++ b/metakb/schemas/api.py @@ -0,0 +1,47 @@ +"""Create schemas for API""" +from typing import List, Literal, Optional + +from pydantic import BaseModel, ConfigDict, StrictStr + +from metakb.schemas.variation_statement import VariantTherapeuticResponseStudy +from metakb.version import __version__ + + +class ServiceMeta(BaseModel): + """Metadata for MetaKB service.""" + + name: Literal["metakb"] = "metakb" + version: StrictStr = __version__ + url: Literal[ + "https://github.com/cancervariants/metakb" + ] = "https://github.com/cancervariants/metakb" + + model_config = ConfigDict( + json_schema_extra={ + "example": { + "name": "metakb", + "version": __version__, + "url": "https://github.com/cancervariants/metakb" + } + } + ) + + +class SearchStudiesQuery(BaseModel): + """Queries for the Search Studies Endpoint.""" + + variation: Optional[StrictStr] = None + disease: Optional[StrictStr] = None + therapy: Optional[StrictStr] = None + gene: Optional[StrictStr] = None + study_id: Optional[StrictStr] = None + + +class SearchStudiesService(BaseModel): + """Define model for Search Studies Endpoint Response.""" + + query: SearchStudiesQuery + warnings: List[StrictStr] = [] + study_ids: List[StrictStr] = [] + studies: List[VariantTherapeuticResponseStudy] = [] + service_meta_: ServiceMeta diff --git a/metakb/version.py b/metakb/version.py index 0d6b9718..8ee3c9cf 100644 --- a/metakb/version.py +++ b/metakb/version.py @@ -1,4 +1,2 @@ """MetaKB version""" -# REQ: EACH TIME VERSION IS UPDATED, MUST ALSO UPDATE LAST_UPDATED -__version__ = "1.1.0" -LAST_UPDATED = "2022-11-08" +__version__ = "2.0.0-dev0" diff --git a/tests/conftest.py b/tests/conftest.py index 93fe1fd7..25d2edae 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,21 +1,12 @@ """Module for pytest fixtures.""" +from copy import deepcopy import pytest import os -import asyncio import json -# from metakb.query import QueryHandler from metakb.normalizers import ViccNormalizers -@pytest.fixture(scope="session") -def event_loop(request): - """Create an instance of the default event loop for each test case.""" - loop = asyncio.get_event_loop_policy().new_event_loop() - yield loop - loop.close() - - @pytest.fixture(scope="session") def cetuximab_extensions(): """Create test fixture for cetuximab extensions""" @@ -1812,14 +1803,14 @@ def pmid_27819322(): } -@pytest.fixture(scope="module") -def moa_aid67_study( - moa_vid67, moa_abl1, moa_imatinib, moa_chronic_myelogenous_leukemia, moa_method, +@pytest.fixture(scope="session") +def moa_aid66_study( + moa_vid66, moa_abl1, moa_imatinib, moa_chronic_myelogenous_leukemia, moa_method, moa_source44 ): - """Create a Variant Therapeutic Response Study test fixture for MOA Assertion 67.""" + """Create a Variant Therapeutic Response Study test fixture for MOA Assertion 66.""" return { - "id": "moa.assertion:67", + "id": "moa.assertion:66", "description": "T315I mutant ABL1 in p210 BCR-ABL cells resulted in retained high levels of phosphotyrosine at increasing concentrations of inhibitor STI-571, whereas wildtype appropriately received inhibition.", # noqa: E501 "direction": "none", "strength": { @@ -1828,7 +1819,7 @@ def moa_aid67_study( "system": "https://go.osu.edu/evidence-codes" }, "predicate": "predictsResistanceTo", - "variant": moa_vid67, + "variant": moa_vid66, "therapeutic": moa_imatinib, "tumorType": moa_chronic_myelogenous_leukemia, "qualifiers": { @@ -1841,11 +1832,11 @@ def moa_aid67_study( } -@pytest.fixture(scope="module") -def moa_vid67(): - """Create a test fixture for MOA VID67.""" +@pytest.fixture(scope="session") +def moa_vid66(): + """Create a test fixture for MOA VID66.""" return { - "id": "moa.variant:67", + "id": "moa.variant:66", "type": "ProteinSequenceConsequence", "label": "ABL1 p.T315I (Missense)", "definingContext": { @@ -1888,7 +1879,7 @@ def moa_vid67(): { "coding": { "system": "https://moalmanac.org/api/features/", - "code": "67" + "code": "66" }, "relation": "exactMatch" }, @@ -2153,50 +2144,94 @@ def moa_source44(): } -def _dict_check(expected_d: dict, actual_d: dict) -> None: +def _dict_check(expected_d: dict, actual_d: dict, is_cdm: bool = False) -> None: """Make dictionary assertion checks. Check that actual matches expected data. :param expected_d: Expected dictionary :param actual_d: Actual dictionary + :param is_cdm: Whether checks are for transformers (CDM) or query handler. + CDM have extra fields that are not exposed to the query handler """ for k, v in expected_d.items(): if isinstance(v, dict): - _dict_check(v, actual_d[k]) + _dict_check(v, actual_d[k], is_cdm=is_cdm) elif isinstance(v, list): actual_l = [json.dumps(v, sort_keys=True) for v in actual_d[k]] - expected_l = [json.dumps(v, sort_keys=True) for v in expected_d[k]] + if is_cdm: + expected_l = [json.dumps(v, sort_keys=True) for v in expected_d[k]] + else: + expected_l = [] + for v in expected_d[k]: + if isinstance(v, dict): + if v.get("name") in { + "therapy_normalizer_data", + "disease_normalizer_data" + }: + updated_ext = v.copy() + normalizer_data_type = v["name"].split("_normalizer_data")[0] # noqa: E501 + updated_ext["name"] = f"{normalizer_data_type}_normalizer_id" # noqa: E501 + updated_ext["value"] = v["value"]["normalized_id"] + expected_l.append(json.dumps(updated_ext, sort_keys=True)) + continue + else: + new_extensions = [] + extensions = v.get("extensions") or [] + for ext in extensions: + if ext.get("name") in { + "therapy_normalizer_data", + "disease_normalizer_data" + }: + normalizer_data_type = ext["name"].split("_normalizer_data")[0] # noqa: E501 + new_extensions.append( + { + "name": f"{normalizer_data_type}_normalizer_id", # noqa: E501 + "type": "Extension", + "value": ext["value"]["normalized_id"] + } + ) + else: + new_extensions.append(ext) + if extensions: + v["extensions"] = new_extensions + expected_l.append(json.dumps(v, sort_keys=True)) assert set(actual_l) == set(expected_l), k else: assert actual_d[k] == expected_d[k], k -def assertion_checks(actual_data: list, test_data: list) -> None: +@pytest.fixture(scope="session") +def assertion_checks(): """Check that actual data matches expected data :param actual_data: List of actual data :param test_data: List of expected data + :param is_cdm: Whether checks are for transformers (CDM) or query handler. + CDM have extra fields that are not exposed to the query handler """ - assert len(actual_data) == len(test_data) - for expected in test_data: - found_match = False - for actual in actual_data: - if actual["id"] == expected["id"]: - found_match = True - assert actual.keys() == expected.keys() - _dict_check(expected, actual) - continue + def _check(actual_data: list, test_data: list, is_cdm: bool = False) -> None: + assert len(actual_data) == len(test_data) + for expected in test_data: + found_match = False + for actual in actual_data: + if actual["id"] == expected["id"]: + found_match = True + assert actual.keys() == expected.keys() + expected_copy = deepcopy(expected) + _dict_check(expected_copy, actual, is_cdm=is_cdm) + continue - assert found_match, f"Did not find {expected['id']} in response" + assert found_match, f"Did not find {expected['id']} in response" + return _check @pytest.fixture(scope="session") -def check_transformed_cdm(): +def check_transformed_cdm(assertion_checks): """Test fixture to compare CDM transformations.""" def check_transformed_cdm( data, studies, transformed_file ): """Test that transform to CDM works correctly.""" - assertion_checks(data["studies"], studies) + assertion_checks(data["studies"], studies, is_cdm=True) os.remove(transformed_file) return check_transformed_cdm @@ -2205,9 +2240,3 @@ def check_transformed_cdm( def normalizers(): """Provide normalizers to querying/transformation tests.""" return ViccNormalizers() - - -# @pytest.fixture(scope="session") -# def query_handler(normalizers): -# """Create query handler test fixture""" -# return QueryHandler(normalizers=normalizers) diff --git a/tests/data/transform/moa_harvester.json b/tests/data/transform/moa_harvester.json index 81686383..9dbb07f6 100644 --- a/tests/data/transform/moa_harvester.json +++ b/tests/data/transform/moa_harvester.json @@ -1,7 +1,7 @@ { "assertions": [ { - "id": 67, + "id": 66, "context": "", "description": "T315I mutant ABL1 in p210 BCR-ABL cells resulted in retained high levels of phosphotyrosine at increasing concentrations of inhibitor STI-571, whereas wildtype appropriately received inhibition.", "disease": { @@ -20,7 +20,7 @@ "validated": true, "source_ids": 44, "variant": { - "id": 67, + "id": 66, "alternate_allele": "T", "cdna_change": "c.944C>T", "chromosome": "9", @@ -95,7 +95,7 @@ ], "variants": [ { - "id": 67, + "id": 66, "alternate_allele": "T", "cdna_change": "c.944C>T", "chromosome": "9", diff --git a/tests/unit/test_query.py b/tests/unit/test_query.py deleted file mode 100644 index 67bf52ea..00000000 --- a/tests/unit/test_query.py +++ /dev/null @@ -1,683 +0,0 @@ -"""Test the MetaKB search method.""" -import pytest - -from metakb.version import __version__, LAST_UPDATED - -# TODO: -# Commented out tests to be fixed after first pass -# Load DB with test data - - -@pytest.mark.asyncio -async def return_response(query_handler, statement_id, **kwargs): - """Return the statement given ID if it exists.""" - response = await query_handler.search(**kwargs) - statements = response['statements'] - propositions = response['propositions'] - assert len(statements) != 0 - assert len(propositions) != 0 - assert len(response['matches']['statements']) != 0 - assert len(response['matches']['propositions']) != 0 - s = None - for statement in statements: - if statement['id'] == statement_id: - s = statement - break - - p = None - for proposition in propositions: - if s['proposition'] == proposition['id']: - p = proposition - break - return s, p - - -def assert_no_match(response): - """No match assertions for queried concepts in search.""" - assert response['statements'] == [] - assert response['propositions'] == [] - assert len(response['warnings']) > 0 - - -def assert_no_match_id(response): - """No match assertions for search by id.""" - assert len(response.keys()) == 3 - assert len(response['warnings']) > 0 - - -def assert_keys_for_detail_false(response_keys): - """Check that keys aren't in response when detail is false.""" - assert 'variation_descriptors' not in response_keys - assert 'gene_descriptors' not in response_keys - assert 'therapy_descriptors' not in response_keys - assert 'disease_descriptors' not in response_keys - assert 'methods' not in response_keys - assert 'documents' not in response_keys - - -def assert_keys_for_detail_true(response_keys, response, is_evidence=True, - tr_response=True): - """Check that keys are in response when detail is false.""" - fields = ['variation_descriptors', 'gene_descriptors', - 'disease_descriptors', 'methods', - 'documents', 'statements', 'propositions'] - if tr_response: - fields += ['therapy_descriptors'] - for field in fields: - assert field in response_keys - if is_evidence: - # Evidence only does not have supported_by with other statements - assert len(response[field]) == 1 - else: - assert len(response[field]) > 1 - - -def assert_response_items(response, statement, proposition, - variation_descriptor, gene_descriptor, - disease_descriptor, method, - document, therapy_descriptor, - check_statement, check_proposition, - check_variation_descriptor, - check_descriptor, check_method, check_document - ): - """Check that search response match expected values.""" - if therapy_descriptor: - assert_keys_for_detail_true(response.keys(), response) - else: - assert_keys_for_detail_true(response.keys(), response, - tr_response=False) - - response_statement = response['statements'][0] - response_proposition = response['propositions'][0] - response_variation_descriptor = response['variation_descriptors'][0] - response_gene_descriptor = response['gene_descriptors'][0] - if therapy_descriptor: - response_therapy_descriptor = response['therapy_descriptors'][0] - else: - response_therapy_descriptor = None - response_disease_descriptor = response['disease_descriptors'][0] - response_method = response['methods'][0] - response_document = response['documents'][0] - - check_statement(response_statement, statement) - check_proposition(response_proposition, proposition) - check_variation_descriptor(response_variation_descriptor, - variation_descriptor) - check_descriptor(gene_descriptor, response_gene_descriptor) - check_descriptor(disease_descriptor, response_disease_descriptor) - if therapy_descriptor: - check_descriptor(therapy_descriptor, response_therapy_descriptor) - check_method(response_method, method) - check_document(response_document, document) - - # Assert that IDs match in response items - assert response_statement['proposition'] == response_proposition['id'] - assert response_statement['variation_descriptor'] == \ - response_variation_descriptor['id'] - if therapy_descriptor: - assert response_statement['therapy_descriptor'] == \ - response_therapy_descriptor['id'] - assert response_statement['disease_descriptor'] == \ - response_disease_descriptor['id'] - assert response_statement['method'] == response_method['id'] - assert response_statement['supported_by'][0] == response_document['id'] - - assert proposition['subject'] == \ - response_variation_descriptor['variation_id'] - assert proposition['object_qualifier'] == \ - response_disease_descriptor['disease_id'] - if therapy_descriptor: - assert proposition['object'] == \ - response_therapy_descriptor['therapy_id'] - - assert response_variation_descriptor['gene_context'] == \ - response_gene_descriptor['id'] - - -def assert_general_search_queries(response): - """Check for general search queries.""" - assert response['matches'] - len_statement_matches = len(response['matches']['statements']) - assert len_statement_matches > 0 - assert len(response['matches']['propositions']) > 0 - len_statements = len(response['statements']) - assert len_statements > 0 - assert len_statement_matches == len_statements - assert len(response['propositions']) > 0 - assert len(response['methods']) > 0 - assert len(response['documents']) > 0 - - -def test_search_id(query_handler): - """Test that search id method works correctly.""" - resp = query_handler.search_by_id( - "proposition:xsTCVDo1bo2P_6Sext0Y3ibU3MPbiyXE" - ) - assert resp["proposition"] - assert not resp["warnings"] - assert query_handler.search_by_id("proposition:001")["warnings"] - assert query_handler.search_by_id("proposition:0")["warnings"] - assert query_handler.search_by_id("proposition:1")["warnings"] - - -@pytest.mark.asyncio -async def test_general_search_queries(query_handler): - """Test that queries do not return errors.""" - response = await query_handler.search(variation='braf v600e', detail=True) - assert_general_search_queries(response) - - response = await query_handler.search(variation='egfr l858r', detail=True) - assert_general_search_queries(response) - - response = await query_handler.search(disease='cancer', detail=True) - assert_general_search_queries(response) - - -@pytest.mark.asyncio -async def test_civic_eid2997(query_handler, civic_eid2997_statement, - civic_eid2997_proposition, check_statement, - check_proposition): - """Test search on CIViC Evidence Item 2997.""" - statement_id = 'civic.eid:2997' - - # Test search by Subject - s, p = await return_response(query_handler, statement_id, - variation='ga4gh:VA.kgjrhgf84CEndyLjKdAO0RxN-e3pJjxA') - check_statement(s, civic_eid2997_statement) - check_proposition(p, civic_eid2997_proposition) - - # Test search by Object - s, p = await return_response(query_handler, statement_id, therapy='rxcui:1430438') - check_statement(s, civic_eid2997_statement) - check_proposition(p, civic_eid2997_proposition) - - # Test search by Object Qualifier - s, p = await return_response(query_handler, statement_id, disease='ncit:C2926') - check_statement(s, civic_eid2997_statement) - check_proposition(p, civic_eid2997_proposition) - - # Test search by Gene Descriptor - # HGNC ID - s, p = await return_response(query_handler, statement_id, gene='hgnc:3236') - check_statement(s, civic_eid2997_statement) - check_proposition(p, civic_eid2997_proposition) - - # Label - s, p = await return_response(query_handler, statement_id, gene='EGFR') - check_statement(s, civic_eid2997_statement) - check_proposition(p, civic_eid2997_proposition) - - # Alt label - s, p = await return_response(query_handler, statement_id, gene='ERBB1') - check_statement(s, civic_eid2997_statement) - check_proposition(p, civic_eid2997_proposition) - - # Test search by Variation Descriptor - # Gene Symbol + Variant Name - s, p = await return_response(query_handler, statement_id, variation='EGFR L858R') - check_statement(s, civic_eid2997_statement) - check_proposition(p, civic_eid2997_proposition) - - # Alt Label - s, p = await return_response(query_handler, statement_id, - variation='egfr Leu858ARG') - check_statement(s, civic_eid2997_statement) - check_proposition(p, civic_eid2997_proposition) - - # HGVS Expression - s, p = await return_response(query_handler, statement_id, - variation='NP_005219.2:p.Leu858Arg') - check_statement(s, civic_eid2997_statement) - check_proposition(p, civic_eid2997_proposition) - - # Test search by Therapy Descriptor - # Label - s, p = await return_response(query_handler, statement_id, therapy='Afatinib') - check_statement(s, civic_eid2997_statement) - check_proposition(p, civic_eid2997_proposition) - - # Alt Label - s, p = await return_response(query_handler, statement_id, therapy='BIBW2992') - check_statement(s, civic_eid2997_statement) - check_proposition(p, civic_eid2997_proposition) - - # Test search by Disease Descriptor - # Label - s, p = await return_response(query_handler, statement_id, - disease='Lung Non-small Cell Carcinoma') - check_statement(s, civic_eid2997_statement) - check_proposition(p, civic_eid2997_proposition) - - -@pytest.mark.asyncio -async def test_civic_eid1409_statement(query_handler, civic_eid1409_statement, - check_statement): - """Test search on CIViC Evidence Item 1409.""" - statement_id = 'civic.eid:1409' - - # Test search by Subject - s, p = await return_response(query_handler, statement_id, - variation='ga4gh:VA.ZDdoQdURgO2Daj2NxLj4pcDnjiiAsfbO') - check_statement(s, civic_eid1409_statement) - - # Test search by Object - s, p = await return_response(query_handler, statement_id, therapy='ncit:C64768') - check_statement(s, civic_eid1409_statement) - - # Test search by Object Qualifier - s, p = await return_response(query_handler, statement_id, disease='ncit:C3510') - check_statement(s, civic_eid1409_statement) - - # Test search by Gene Descriptor - # HGNC ID - s, p = await return_response(query_handler, statement_id, gene='hgnc:1097') - check_statement(s, civic_eid1409_statement) - - # Label - s, p = await return_response(query_handler, statement_id, gene='BRAF') - check_statement(s, civic_eid1409_statement) - - # TODO: Not found in gene normalizer - # # Alt label - # s, p = await return_response(query_handler, - # statement_id, gene='NS7') - # assertions(civic_eid1409_statement, s) - - # Test search by Variation Descriptor - # Gene Symbol + Variant Name - s, p = await return_response(query_handler, statement_id, variation='BRAF V600E') - check_statement(s, civic_eid1409_statement) - - # # Alt Label - s, p = await return_response(query_handler, statement_id, - variation='braf val600glu') - check_statement(s, civic_eid1409_statement) - - # HGVS Expression - s, p = await return_response(query_handler, statement_id, - variation='NP_004324.2:p.Val600Glu') - check_statement(s, civic_eid1409_statement) - - # Test search by Therapy Descriptor - # Label - s, p = await return_response(query_handler, statement_id, therapy='Vemurafenib') - check_statement(s, civic_eid1409_statement) - - # # Alt Label - s, p = await return_response(query_handler, statement_id, - therapy='BRAF(V600E) Kinase Inhibitor RO5185426') - check_statement(s, civic_eid1409_statement) - - # Label - s, p = await return_response(query_handler, statement_id, disease='Skin Melanoma') - check_statement(s, civic_eid1409_statement) - - -@pytest.mark.asyncio -async def test_civic_aid6(query_handler, civic_aid6_statement, check_statement): - """Test search on CIViC Evidence Item 6.""" - statement_id = 'civic.aid:6' - - # Test search by Subject - s, p = await return_response(query_handler, statement_id, - variation='ga4gh:VA.kgjrhgf84CEndyLjKdAO0RxN-e3pJjxA') - check_statement(s, civic_aid6_statement) - - # Test search by Object - s, p = await return_response(query_handler, statement_id, therapy='rxcui:1430438') - check_statement(s, civic_aid6_statement) - - # Test search by Object Qualifier - s, p = await return_response(query_handler, statement_id, disease='ncit:C2926') - check_statement(s, civic_aid6_statement) - - # Test search by Gene Descriptor - # HGNC ID - s, p = await return_response(query_handler, statement_id, gene='hgnc:3236') - check_statement(s, civic_aid6_statement) - - # Label - s, p = await return_response(query_handler, statement_id, gene='EGFR') - check_statement(s, civic_aid6_statement) - - # Alt label - s, p = await return_response(query_handler, statement_id, gene='ERBB1') - check_statement(s, civic_aid6_statement) - - # Test search by Variation Descriptor - # Gene Symbol + Variant Name - s, p = await return_response(query_handler, statement_id, variation='EGFR L858R') - check_statement(s, civic_aid6_statement) - - # Alt Label - s, p = await return_response(query_handler, statement_id, - variation='egfr leu858arg') - check_statement(s, civic_aid6_statement) - - # HGVS Expression - s, p = await return_response(query_handler, statement_id, - variation='NP_005219.2:p.leu858arg') - check_statement(s, civic_aid6_statement) - - # Label - s, p = await return_response(query_handler, statement_id, therapy='afatinib') - check_statement(s, civic_aid6_statement) - - # Alt Label - s, p = await return_response(query_handler, statement_id, therapy='BIBW 2992') - check_statement(s, civic_aid6_statement) - - # Label - s, p = await return_response(query_handler, statement_id, - disease='Lung Non-small Cell Carcinoma ') - check_statement(s, civic_aid6_statement) - - -@pytest.mark.asyncio -async def test_multiple_parameters(query_handler): - """Test that multiple parameter searches work correctly.""" - # Test no match - response = await query_handler.search(variation=' braf v600e', gene='egfr', - disease='cancer', therapy='cisplatin') - assert_no_match(response) - - response = await query_handler.search(therapy='cisplatin', disease='4dfadfafas') - assert_no_match(response) - - # Test EID2997 queries - object_qualifier = 'ncit:C2926' - subject = 'ga4gh:VA.kgjrhgf84CEndyLjKdAO0RxN-e3pJjxA' - object = 'rxcui:1430438' - response = await query_handler.search( - variation='NP_005219.2:p.Leu858Arg', - disease='NSCLC', - therapy='Afatinib' - ) - for p in response['propositions']: - if p['id'] in response['matches']['propositions']: - assert p['object_qualifier'] == object_qualifier - assert p['subject'] == subject - assert p['object'] == object - - # Wrong gene - response = await query_handler.search( - variation='NP_005219.2:p.Leu858Arg', - disease='NSCLC', - therapy='Afatinib', - gene='braf' - ) - assert_no_match(response) - - # Test eid1409 queries - object_qualifier = 'ncit:C3510' - subject = 'ga4gh:VA.ZDdoQdURgO2Daj2NxLj4pcDnjiiAsfbO' - response = await query_handler.search( - variation=subject, - disease='malignant trunk melanoma' - ) - for p in response['propositions']: - if p['id'] in response['matches']['propositions']: - assert p['object_qualifier'] == object_qualifier - assert p['subject'] == subject - assert p['object'] - - # No Match for statement ID - response = await query_handler.search( - variation=subject, - disease='malignant trunk melanoma', - statement_id='civic.eid:2997' - ) - assert_no_match(response) - - # CIViC EID2997 - response = await query_handler.search( - statement_id='civiC.eid:2997', - variation='ga4gh:VA.kgjrhgf84CEndyLjKdAO0RxN-e3pJjxA' - ) - assert len(response['statements']) == 1 - assert len(response['propositions']) == 1 - assert len(response['matches']['statements']) == 1 - assert len(response['matches']['propositions']) == 1 - - # CIViC AID6 - response = await query_handler.search( - statement_id='CIViC.AID:6', - variation='ga4gh:VA.kgjrhgf84CEndyLjKdAO0RxN-e3pJjxA', - disease='ncit:C2926' - ) - assert len(response['statements']) > 1 - assert len(response['propositions']) > 1 - assert len(response['matches']['statements']) == 1 - assert len(response['matches']['propositions']) == 1 - - civic_aid6_supported_by_statements = list() - for s in response['statements']: - if s['id'] == 'civic.aid:6': - statement = s - else: - civic_aid6_supported_by_statements.append(s['id']) - supported_by_statements = [s for s in statement['supported_by'] if - s.startswith('civic.eid:')] - assert set(civic_aid6_supported_by_statements) == \ - set(supported_by_statements) - - response = await query_handler.search( - disease='ncit:C2926', - variation='ga4gh:VA.kgjrhgf84CEndyLjKdAO0RxN-e3pJjxA' - ) - statement_ids = list() - for s in response['statements']: - if s['id'] == 'civic.aid:6': - pass - else: - statement_ids.append(s['id']) - for aid6_statement in civic_aid6_supported_by_statements: - assert aid6_statement in statement_ids - assert len(response['matches']['statements']) > 1 - assert len(response['matches']['propositions']) > 1 - - -@pytest.mark.asyncio -async def test_civic_detail_flag_therapeutic( - query_handler, civic_eid2997_statement, civic_eid2997_proposition, civic_vid33, - civic_gid19, civic_did8, method1, pmid_23982599, civic_tid146, check_statement, - check_proposition, check_variation_descriptor, check_descriptor, check_method, - check_document -): - """Test that detail flag works correctly for CIViC Therapeutic Response.""" - response = await query_handler.search(statement_id='civic.eid:2997', detail=False) - assert_keys_for_detail_false(response.keys()) - - response = await query_handler.search(statement_id='civic.eid:2997', detail=True) - assert_keys_for_detail_true(response.keys(), response) - assert_response_items(response, civic_eid2997_statement, - civic_eid2997_proposition, - civic_vid33, civic_gid19, civic_did8, - method1, pmid_23982599, civic_tid146, - check_statement, check_proposition, - check_variation_descriptor, - check_descriptor, check_method, check_document - ) - - -@pytest.mark.asyncio -async def test_civic_detail_flag_diagnostic( - query_handler, civic_eid2_statement, civic_eid2_proposition, civic_vid99, - civic_did2, civic_gid38, method1, pmid_15146165, check_statement, check_proposition, - check_variation_descriptor, check_descriptor, check_method, check_document -): - """Test that detail flag works correctly for CIViC Diagnostic Response.""" - response = await query_handler.search(statement_id='civic.eid:2', detail=False) - assert_keys_for_detail_false(response.keys()) - - response = await query_handler.search(statement_id='civic.eid:2', detail=True) - assert_keys_for_detail_true(response.keys(), response, tr_response=False) - assert_response_items(response, civic_eid2_statement, - civic_eid2_proposition, - civic_vid99, civic_gid38, civic_did2, - method1, pmid_15146165, None, check_statement, - check_proposition, check_variation_descriptor, - check_descriptor, check_method, check_document) - - -@pytest.mark.asyncio -async def test_civic_detail_flag_prognostic( - query_handler, civic_eid26_statement, civic_eid26_proposition, civic_vid65, - civic_did3, civic_gid29, method1, pmid_16384925, check_statement, check_proposition, - check_variation_descriptor, check_descriptor, check_method, check_document -): - """Test that detail flag works correctly for CIViC Prognostic Response.""" - response = await query_handler.search(statement_id='civic.eid:26', detail=False) - assert_keys_for_detail_false(response.keys()) - - response = await query_handler.search(statement_id='civic.eid:26', detail=True) - assert_keys_for_detail_true(response.keys(), response, tr_response=False) - assert_response_items(response, civic_eid26_statement, - civic_eid26_proposition, - civic_vid65, civic_gid29, civic_did3, - method1, pmid_16384925, None, check_statement, - check_proposition, check_variation_descriptor, - check_descriptor, check_method, check_document) - - -@pytest.mark.asyncio -async def test_moa_detail_flag( - query_handler, moa_aid71_statement, moa_aid71_proposition, moa_vid67, moa_abl1, - moa_imatinib, moa_chronic_myelogenous_leukemia, method4, pmid_11423618, - check_statement, check_proposition, check_variation_descriptor, check_descriptor, - check_method, check_document -): - """Test that detail flag works correctly for MOA.""" - response = await query_handler.search(statement_id='moa.assertion:71', detail=False) - assert_keys_for_detail_false(response.keys()) - - response = await query_handler.search(statement_id='moa.assertion:71', detail=True) - assert_keys_for_detail_true(response.keys(), response) - assert_response_items(response, moa_aid71_statement, moa_aid71_proposition, - moa_vid67, moa_abl1, - moa_chronic_myelogenous_leukemia, method4, - pmid_11423618, moa_imatinib, check_statement, - check_proposition, check_variation_descriptor, - check_descriptor, check_method, check_document) - - -@pytest.mark.asyncio -async def test_no_matches(query_handler): - """Test invalid query matches.""" - # GA instead of VA - response = await query_handler.search('ga4gh:GA.WyOqFMhc8aOnMFgdY0uM7nSLNqxVPAiR') - assert_no_match(response) - - # Invalid ID - response = \ - await query_handler.search(disease='ncit:C292632425235321524352435623462') - assert_no_match(response) - - # Empty query - response = await query_handler.search(disease='') - assert_no_match(response) - - response = await query_handler.search(gene='', therapy='', variation='', disease='') - assert_no_match(response) - assert response['warnings'] == ['No parameters were entered.'] - - # Invalid variation - response = await query_handler.search(variation='v600e') - assert_no_match(response) - - response = query_handler.search_by_id('') - assert_no_match_id(response) - - response = query_handler.search_by_id(' ') - assert_no_match_id(response) - - response = query_handler.search_by_id('aid6') - assert_no_match_id(response) - - response = query_handler.search_by_id('civc.assertion:6') - assert_no_match_id(response) - - -def test_civic_id_search(query_handler, civic_eid2997_statement, - civic_vid33, civic_gid19, civic_tid146, civic_did8, - pmid_23982599, method1, check_statement, - check_variation_descriptor, check_descriptor, - check_method, check_document): - """Test search on civic node id""" - res = query_handler.search_by_id('civic.eid:2997') - check_statement(res['statement'], civic_eid2997_statement) - - res = query_handler.search_by_id('civic.vid:33') - check_variation_descriptor(res['variation_descriptor'], civic_vid33) - - res = query_handler.search_by_id('civic.gid:19') - check_descriptor(res['gene_descriptor'], civic_gid19) - - res = query_handler.search_by_id('civic.tid:146') - check_descriptor(res['therapy_descriptor'], civic_tid146) - - res = query_handler.search_by_id('civic.did:8') - check_descriptor(res['disease_descriptor'], civic_did8) - - res = query_handler.search_by_id('pmid:23982599') - check_document(res['document'], pmid_23982599) - - res = query_handler.search_by_id('method:1') - check_method(res['method'], method1) - - -def test_moa_id_search(query_handler, moa_aid71_statement, - moa_vid67, moa_abl1, moa_imatinib, - moa_chronic_myelogenous_leukemia, pmid_11423618, - method4, check_statement, check_variation_descriptor, - check_descriptor, check_method, check_document): - """Test search on moa node id""" - res = query_handler.search_by_id('moa.assertion:71') - check_statement(res['statement'], moa_aid71_statement) - - res = query_handler.search_by_id('moa.variant:71') - check_variation_descriptor(res['variation_descriptor'], moa_vid67) - - res = query_handler.search_by_id('moa.normalize.gene:ABL1') - check_descriptor(res['gene_descriptor'], moa_abl1) - - res = query_handler.search_by_id('moa.normalize.therapy:Imatinib') - check_descriptor(res['therapy_descriptor'], moa_imatinib) - - res = query_handler.search_by_id('moa.normalize.disease:oncotree%3ACML') - check_descriptor(res['disease_descriptor'], - moa_chronic_myelogenous_leukemia) - - res = query_handler.search_by_id('moa.normalize.disease:oncotree:CML') - check_descriptor(res['disease_descriptor'], - moa_chronic_myelogenous_leukemia) - - res = query_handler.search_by_id('pmid:11423618') - check_document(res['document'], pmid_11423618) - - res = query_handler.search_by_id(' method:4 ') - check_method(res['method'], method4) - - -@pytest.mark.asyncio -async def test_service_meta(query_handler): - """Test service meta in response""" - def check_service_meta(response): - """Check service meta in response is correct""" - assert "service_meta_" in response - service_meta_ = response["service_meta_"] - assert service_meta_["name"] == "metakb" - assert service_meta_["version"] == __version__ - assert service_meta_["last_updated"] == LAST_UPDATED - assert service_meta_["url"] == \ - "https://github.com/cancervariants/metakb" - - statement_id = "civic.eid:2997" - resp = await query_handler.search(statement_id=statement_id) - check_service_meta(resp) - - resp = query_handler.search_by_id("method:4") - check_service_meta(resp) - - resp = await query_handler.search_statements(statement_id=statement_id) - check_service_meta(resp) diff --git a/tests/unit/test_search_statements.py b/tests/unit/test_search_statements.py deleted file mode 100644 index 74b53afe..00000000 --- a/tests/unit/test_search_statements.py +++ /dev/null @@ -1,223 +0,0 @@ -"""Test the MetaKB search statements method""" -import copy - -import pytest - - -@pytest.fixture(scope="module") -def civic_vid33_with_gene(civic_vid33, civic_gid19): - """Create civic vid 33 test fixture""" - vid33 = copy.deepcopy(civic_vid33) - vid33["gene_context"] = civic_gid19 - return vid33 - - -@pytest.fixture(scope="module") -def civic_eid2997(civic_eid2997_proposition, civic_vid33_with_gene, - civic_tid146, civic_did8, method1, pmid_23982599): - """Create test fixture for CIViC EID2997""" - return { - "id": "civic.eid:2997", - "type": "Statement", - "description": "Afatinib, an irreversible inhibitor of the ErbB family of tyrosine kinases has been approved in the US for the first-line treatment of patients with metastatic non-small-cell lung cancer (NSCLC) who have tumours with EGFR exon 19 deletions or exon 21 (L858R) substitution mutations as detected by a US FDA-approved test", # noqa: E501 - "direction": "supports", - "evidence_level": "civic.evidence_level:A", - "proposition": civic_eid2997_proposition, - "variation_origin": "somatic", - "variation_descriptor": civic_vid33_with_gene, - "therapy_descriptor": civic_tid146, - "disease_descriptor": civic_did8, - "method": method1, - "supported_by": [pmid_23982599] - } - - -@pytest.fixture(scope="module") -def civic_aid6(civic_eid2997_proposition, civic_vid33_with_gene, civic_tid146, - civic_did8, method2, civic_aid6_document): - """Create test fixture for CIViC AID6""" - return { - "id": "civic.aid:6", - "description": "L858R is among the most common sensitizing EGFR mutations in NSCLC, and is assessed via DNA mutational analysis, including Sanger sequencing and next generation sequencing methods. Tyrosine kinase inhibitor afatinib is FDA approved, and is recommended (category 1) by NCCN guidelines along with erlotinib, gefitinib and osimertinib as first line systemic therapy in NSCLC with sensitizing EGFR mutation.", # noqa: E501 - "direction": "supports", - "evidence_level": "amp_asco_cap_2017_level:1A", - "proposition": civic_eid2997_proposition, - "variation_origin": "somatic", - "variation_descriptor": civic_vid33_with_gene, - "therapy_descriptor": civic_tid146, - "disease_descriptor": civic_did8, - "method": method2, - "supported_by": [ - civic_aid6_document, "civic.eid:2997", - "civic.eid:2629", "civic.eid:982", - "civic.eid:968", "civic.eid:883", - "civic.eid:879" - ], - "type": "Statement" - } - - -@pytest.fixture(scope="module") -def moa_vid71_with_gene(moa_vid67, moa_abl1): - """Create test fixture for MOA Variant 71 with gene descriptor""" - vid71 = copy.deepcopy(moa_vid67) - vid71["gene_context"] = moa_abl1 - return vid71 - - -@pytest.fixture(scope="module") -def moa_aid71(moa_aid71_proposition, moa_vid71_with_gene, moa_imatinib, - moa_chronic_myelogenous_leukemia, method4, - pmid_11423618): - """Create test fixture for MOA Assertion 71""" - return { - "id": "moa.assertion:71", - "type": "Statement", - "description": "T315I mutant ABL1 in p210 BCR-ABL cells resulted in retained high levels of phosphotyrosine at increasing concentrations of inhibitor STI-571, whereas wildtype appropriately received inhibition.", # noqa: E501 - "evidence_level": "moa.evidence_level:Preclinical", - "proposition": moa_aid71_proposition, - "variation_origin": "somatic", - "variation_descriptor": moa_vid71_with_gene, - "therapy_descriptor": moa_imatinib, - "disease_descriptor": moa_chronic_myelogenous_leukemia, - "method": method4, - "supported_by": [pmid_11423618] - } - - -def assert_general_search_statements(response): - """Check that general search statement queries return a valid response""" - assert response["matches"] - assert len(response["matches"]["propositions"]) > 0 - len_statement_matches = len(response["matches"]["statements"]) - assert len_statement_matches > 0 - len_statements = len(response["statements"]) - assert len_statements > 0 - assert len_statement_matches == len_statements - - -def assert_no_match(response): - """No match assertions for queried concepts in search search statements.""" - assert response["statements"] == [] - assert len(response["matches"]["propositions"]) == 0 - assert len(response["matches"]["statements"]) == 0 - assert len(response["warnings"]) > 0 - - -def check_statement_assertions( - actual, test, check_proposition, check_variation_descriptor, - check_descriptor, check_method): - """Check that statement response is correct""" - for key in ["id", "type", "description", "evidence_level", - "variation_origin", "method"]: - assert actual[key] == test[key] - if "direction" in test.keys(): - # MOA doesn"t have direction - assert actual["direction"] == test["direction"] - else: - assert "direction" not in actual.keys() - - check_proposition(actual["proposition"], test["proposition"]) - check_variation_descriptor(actual["variation_descriptor"], - test["variation_descriptor"]) - check_descriptor(actual["disease_descriptor"], test["disease_descriptor"]) - if test.get("therapy_descriptor"): - check_descriptor(actual["therapy_descriptor"], - test["therapy_descriptor"]) - else: - assert actual.get("therapy_descriptor") is None - check_method(actual["method"], test["method"]) - assert len(actual["supported_by"]) == len(test["supported_by"]) - for sb in test["supported_by"]: - assert sb in actual["supported_by"] - - -@pytest.mark.asyncio -async def test_civic_eid2997( - query_handler, civic_eid2997, check_proposition, - check_variation_descriptor, check_descriptor, check_method): - """Test that search_statements works correctly for CIVIC EID2997""" - resp = await query_handler.search_statements(statement_id="civic.eid:2997") - assert len(resp["statements"]) == 1 - assert resp["matches"]["statements"] == ["civic.eid:2997"] - assert len(resp["matches"]["propositions"]) == 1 - check_statement_assertions( - resp["statements"][0], civic_eid2997, check_proposition, - check_variation_descriptor, check_descriptor, check_method) - assert resp["warnings"] == [] - - -@pytest.mark.asyncio -async def test_civic_aid6( - query_handler, civic_aid6, civic_eid2997, check_proposition, - check_variation_descriptor, check_descriptor, check_method): - """Test that search_statements works correctly for CIVIC EID2997""" - resp = await query_handler.search_statements(statement_id="civic.aid:6") - assert len(resp["statements"]) == 7 - assert resp["matches"]["statements"] == ["civic.aid:6"] - assert len(resp["matches"]["propositions"]) == 1 - assert resp["warnings"] == [] - found_eid2997 = False - found_aid6 = False - - for s in resp["statements"]: - if s["id"] == "civic.eid:2997": - check_statement_assertions( - s, civic_eid2997, check_proposition, - check_variation_descriptor, check_descriptor, check_method) - found_eid2997 = True - elif s["id"] == "civic.aid:6": - check_statement_assertions( - s, civic_aid6, check_proposition, - check_variation_descriptor, check_descriptor, check_method) - found_aid6 = True - assert found_eid2997 - assert found_aid6 - - -@pytest.mark.asyncio -async def test_moa(query_handler, moa_aid71, check_proposition, - check_variation_descriptor, check_descriptor, check_method): - """Test that search_statements works correctly for MOA Assertion 71""" - resp = await query_handler.search_statements( - statement_id="moa.assertion:71") - assert len(resp["statements"]) == 1 - check_statement_assertions( - resp["statements"][0], moa_aid71, check_proposition, - check_variation_descriptor, check_descriptor, check_method) - assert resp["warnings"] == [] - - -@pytest.mark.asyncio -async def test_general_search_statements(query_handler): - """Test that queries do not return errors""" - resp = await query_handler.search_statements(variation="BRAF V600E") - assert_general_search_statements(resp) - - resp = await query_handler.search_statements(variation="EGFR L858R") - assert_general_search_statements(resp) - - resp = await query_handler.search_statements(disease="cancer") - assert_general_search_statements(resp) - - -@pytest.mark.asyncio -async def test_no_matches(query_handler): - """Test invalid queries""" - # invalid vrs variation prefix - resp = await query_handler.search_statements( - variation="ga4gh:variation.kgjrhgf84CEndyLjKdAO0RxN-e3pJjxA") - assert_no_match(resp) - - # invalid id - resp = await query_handler.search_statements( - disease="ncit:C292632425235321524352435623462" - ) - assert_no_match(resp) - - resp = await query_handler.search_statements(statement_id="civic:aid6") - assert_no_match(resp) - - # empty query - resp = await query_handler.search_statements(therapy="") - assert_no_match(resp) diff --git a/tests/unit/test_search_studies.py b/tests/unit/test_search_studies.py new file mode 100644 index 00000000..18123cab --- /dev/null +++ b/tests/unit/test_search_studies.py @@ -0,0 +1,224 @@ +"""Test the MetaKB search_studies method""" +from typing import Dict + +import pytest + +from metakb.query import QueryHandler +from metakb.schemas.api import SearchStudiesService + + +@pytest.fixture(scope="module") +def query_handler(normalizers): + """Create query handler test fixture""" + return QueryHandler(normalizers=normalizers) + + +def assert_general_search_studies(response): + """Check that general search_studies queries return a valid response""" + len_study_id_matches = len(response.study_ids) + assert len_study_id_matches > 0 + len_studies = len(response.studies) + assert len_studies > 0 + assert len_study_id_matches == len_studies + + +def assert_no_match(response): + """No match assertions for queried concepts in search_studies.""" + assert response.studies == response.study_ids == [] + assert len(response.warnings) > 0 + + +def find_and_check_study( + resp: SearchStudiesService, + expected_study: Dict, + assertion_checks: callable, + should_find_match: bool = True +): + """Check that expected study is or is not in response""" + if should_find_match: + assert expected_study["id"] in resp.study_ids + else: + assert expected_study["id"] not in resp.study_ids + + actual_study = None + for study in resp.studies: + if study.id == expected_study["id"]: + actual_study = study + break + + if should_find_match: + assert actual_study, f"Did not find study ID {expected_study['id']} in studies" + resp_studies = [actual_study.model_dump(exclude_none=True)] + assertion_checks(resp_studies, [expected_study]) + else: + assert actual_study is None + + +@pytest.mark.asyncio(scope="module") +async def test_civic_eid2997(query_handler, civic_eid2997_study, assertion_checks): + """Test that search_studies method works correctly for CIViC EID2997""" + resp = await query_handler.search_studies(study_id=civic_eid2997_study["id"]) + assert resp.study_ids == [civic_eid2997_study["id"]] + resp_studies = [s.model_dump(exclude_none=True) for s in resp.studies] + assertion_checks(resp_studies, [civic_eid2997_study]) + assert resp.warnings == [] + + resp = await query_handler.search_studies(variation="EGFR L858R") + find_and_check_study(resp, civic_eid2997_study, assertion_checks) + + resp = await query_handler.search_studies( + variation="ga4gh:VA.S41CcMJT2bcd8R4-qXZWH1PoHWNtG2PZ" + ) + find_and_check_study(resp, civic_eid2997_study, assertion_checks) + + # genomic query + resp = await query_handler.search_studies(variation="7-55259515-T-G") + find_and_check_study(resp, civic_eid2997_study, assertion_checks) + # At the moment, MOA cannot be queried via related genomic queries + # only civic stores genomic members + assert not [s_id for s_id in resp.study_ids if s_id.startswith("moa")] + + resp = await query_handler.search_studies(therapy="ncit:C66940") + find_and_check_study(resp, civic_eid2997_study, assertion_checks) + + resp = await query_handler.search_studies(gene="EGFR") + find_and_check_study(resp, civic_eid2997_study, assertion_checks) + + resp = await query_handler.search_studies(disease="nsclc") + find_and_check_study(resp, civic_eid2997_study, assertion_checks) + + # We should not find CIViC EID2997 using these queries + resp = await query_handler.search_studies(study_id="civic.eid:3017") + find_and_check_study(resp, civic_eid2997_study, assertion_checks, False) + + resp = await query_handler.search_studies(variation="BRAF V600E") + find_and_check_study(resp, civic_eid2997_study, assertion_checks, False) + + resp = await query_handler.search_studies(therapy="imatinib") + find_and_check_study(resp, civic_eid2997_study, assertion_checks, False) + + resp = await query_handler.search_studies(gene="BRAF") + find_and_check_study(resp, civic_eid2997_study, assertion_checks, False) + + resp = await query_handler.search_studies(disease="DOID:9253") + find_and_check_study(resp, civic_eid2997_study, assertion_checks, False) + + +@pytest.mark.asyncio(scope="module") +async def test_civic816(query_handler, civic_eid816_study, assertion_checks): + """Test that search_studies method works correctly for CIViC EID816""" + resp = await query_handler.search_studies(study_id=civic_eid816_study["id"]) + assert resp.study_ids == [civic_eid816_study["id"]] + resp_studies = [s.model_dump(exclude_none=True) for s in resp.studies] + assertion_checks(resp_studies, [civic_eid816_study]) + assert resp.warnings == [] + + # Try querying based on therapies in substitutes + resp = await query_handler.search_studies(therapy="Cetuximab") + find_and_check_study(resp, civic_eid816_study, assertion_checks) + + resp = await query_handler.search_studies(therapy="Panitumumab") + find_and_check_study(resp, civic_eid816_study, assertion_checks) + + +@pytest.mark.asyncio(scope="module") +async def test_civic9851(query_handler, civic_eid9851_study, assertion_checks): + """Test that search_studies method works correctly for CIViC EID9851""" + resp = await query_handler.search_studies(study_id=civic_eid9851_study["id"]) + assert resp.study_ids == [civic_eid9851_study["id"]] + resp_studies = [s.model_dump(exclude_none=True) for s in resp.studies] + assertion_checks(resp_studies, [civic_eid9851_study]) + assert resp.warnings == [] + + # Try querying based on therapies in components + resp = await query_handler.search_studies(therapy="Encorafenib") + find_and_check_study(resp, civic_eid9851_study, assertion_checks) + + resp = await query_handler.search_studies(therapy="Cetuximab") + find_and_check_study(resp, civic_eid9851_study, assertion_checks) + + +@pytest.mark.asyncio(scope="module") +async def test_moa_66(query_handler, moa_aid66_study, assertion_checks): + """Test that search_studies method works correctly for MOA Assertion 66""" + resp = await query_handler.search_studies(study_id=moa_aid66_study["id"]) + assert resp.study_ids == [moa_aid66_study["id"]] + resp_studies = [s.model_dump(exclude_none=True) for s in resp.studies] + assertion_checks(resp_studies, [moa_aid66_study]) + assert resp.warnings == [] + + resp = await query_handler.search_studies(variation="ABL1 Thr315Ile") + find_and_check_study(resp, moa_aid66_study, assertion_checks) + + resp = await query_handler.search_studies( + variation="ga4gh:VA.D6NzpWXKqBnbcZZrXNSXj4tMUwROKbsQ" + ) + find_and_check_study(resp, moa_aid66_study, assertion_checks) + + resp = await query_handler.search_studies(therapy="rxcui:282388") + find_and_check_study(resp, moa_aid66_study, assertion_checks) + + resp = await query_handler.search_studies(gene="ncbigene:25") + find_and_check_study(resp, moa_aid66_study, assertion_checks) + + resp = await query_handler.search_studies(disease="CML") + find_and_check_study(resp, moa_aid66_study, assertion_checks) + + # We should not find MOA Assertion 67 using these queries + resp = await query_handler.search_studies(study_id="moa.assertion:71") + find_and_check_study(resp, moa_aid66_study, assertion_checks, False) + + resp = await query_handler.search_studies(variation="BRAF V600E") + find_and_check_study(resp, moa_aid66_study, assertion_checks, False) + + resp = await query_handler.search_studies(therapy="Afatinib") + find_and_check_study(resp, moa_aid66_study, assertion_checks, False) + + resp = await query_handler.search_studies(gene="ABL2") + find_and_check_study(resp, moa_aid66_study, assertion_checks, False) + + resp = await query_handler.search_studies(disease="ncit:C2926") + find_and_check_study(resp, moa_aid66_study, assertion_checks, False) + + +@pytest.mark.asyncio(scope="module") +async def test_general_search_studies(query_handler): + """Test that queries do not return errors""" + resp = await query_handler.search_studies(variation="BRAF V600E") + assert_general_search_studies(resp) + + resp = await query_handler.search_studies(variation="EGFR L858R") + assert_general_search_studies(resp) + + resp = await query_handler.search_studies(disease="cancer") + assert_general_search_studies(resp) + + resp = await query_handler.search_studies(therapy="Cetuximab") + assert_general_search_studies(resp) + + resp = await query_handler.search_studies(gene="VHL") + assert_general_search_studies(resp) + + +@pytest.mark.asyncio(scope="module") +async def test_no_matches(query_handler): + """Test invalid queries""" + # invalid vrs variation prefix (digest is correct) + resp = await query_handler.search_studies( + variation="ga4gh:variation.TAARa2cxRHmOiij9UBwvW-noMDoOq2x9" + ) + assert_no_match(resp) + + # invalid id + resp = await query_handler.search_studies( + disease="ncit:C292632425235321524352435623462" + ) + assert_no_match(resp) + + # empty query + resp = await query_handler.search_studies() + assert_no_match(resp) + + # valid queries, but no matches with combination + resp = await query_handler.search_studies(variation="BRAF V600E", gene="EGFR") + assert_no_match(resp) diff --git a/tests/unit/transform/test_moa_transform.py b/tests/unit/transform/test_moa_transform.py index 68e7d392..9156fea5 100644 --- a/tests/unit/transform/test_moa_transform.py +++ b/tests/unit/transform/test_moa_transform.py @@ -199,9 +199,9 @@ def moa_aid155_study( @pytest.fixture(scope="module") -def studies(moa_aid67_study, moa_aid155_study): +def studies(moa_aid66_study, moa_aid155_study): """Create test fixture for MOA therapeutic studies.""" - return [moa_aid67_study, moa_aid155_study] + return [moa_aid66_study, moa_aid155_study] def test_moa_cdm(data, studies, check_transformed_cdm):