Skip to content

Commit

Permalink
wip: initial cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
korikuzma committed Jan 30, 2025
1 parent 02ea919 commit 566bf3a
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 70 deletions.
36 changes: 15 additions & 21 deletions src/metakb/transformers/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,19 +57,20 @@


def _sanitize_name(name: str) -> str:
"""Trim leading and trailing whitespace and replace whitespace with underscore
"""Trim leading and trailing whitespace and replace whitespace characters with
underscores
:param name: Name to sanitize
:return: Sanitized string with spaces replaced by underscores
:return: Sanitized string with whitespace characters replaced by underscores
"""
return re.sub(r"\s+", "_", name.strip())


class NormalizerExtensionName(str, Enum):
"""Define constraints for normalizer extension names"""

PRIORITY = "vicc_normalizer_priority"
FAILURE = "vicc_normalizer_failure"
PRIORITY = "vicc_normalizer_priority" # concept mapping is merged concept ID
FAILURE = "vicc_normalizer_failure" # normalizer failed or is not supported


class EcoLevel(str, Enum):
Expand Down Expand Up @@ -126,7 +127,7 @@ class ViccConceptVocab(BaseModel):
definition: StrictStr


class _Cache(BaseModel):
class _TransformedRecordsCache(BaseModel):
"""Define model for caching transformed records"""

therapies: ClassVar[dict[str, MappableConcept]] = {}
Expand Down Expand Up @@ -288,7 +289,7 @@ def __init__(
:param Optional[Path] harvester_path: Path to previously harvested data
:param ViccNormalizers normalizers: normalizer collection instance
"""
self._cache: _Cache
self._cache: _TransformedRecordsCache
self.name = self.__class__.__name__.lower().split("transformer")[0]
self.data_dir = data_dir / self.name
self.harvester_path = harvester_path
Expand Down Expand Up @@ -422,7 +423,7 @@ def _get_therapy(self, therapy: dict) -> MappableConcept | None:
"""Get therapy mappable concept for source therapy object
:param therapy: source therapy object
:return: If able to normalize therapy, returns therapy mappable concept
:return: therapy mappable concept
"""

@abstractmethod
Expand All @@ -437,8 +438,7 @@ def _get_therapeutic_substitute_group(
:param therapeutic_sub_group_id: ID for Therapeutic Substitute Group
:param therapies: List of therapy objects
:param therapy_interaction_type: Therapy interaction type
:return: If able to normalize all therapy objects in `therapies`, returns
Therapeutic Substitute Group
:return: Therapeutic Substitute Group
"""

def _get_combination_therapy(
Expand All @@ -452,8 +452,7 @@ def _get_combination_therapy(
:param combination_therapy_id: ID for Combination Therapy
:param therapies: List of source therapy objects
:param therapy_interaction_type: Therapy type provided by source
:return: If able to normalize all therapy objects in `therapies`, returns
Combination Therapy
:return: Combination Therapy
"""
therapies = []
source_name = type(self).__name__.lower().replace("transformer", "")
Expand Down Expand Up @@ -505,20 +504,17 @@ def _add_therapy(
therapies: list[dict],
therapy_type: TherapyType,
therapy_interaction_type: str | None = None,
) -> MappableConcept:
) -> MappableConcept | None:
"""Create or get therapy mappable concept given therapies
First look in cache for existing therapy, if not found will attempt to
normalize. Will add `therapy_id` to `therapies` and
`able_to_normalize['therapies']` if therapy-normalizer is able to normalize all
`therapies`. Else, will add the `therapy_id` to
`unable_to_normalize['therapies']`
First look in ``_cache`` for existing therapy, if not found will attempt to
transform. Will add ``therapy_id`` to ``therapies`` and ``_cache.therapies``
:param therapy_id: ID for therapy
:param therapies: List of therapy objects. If `therapy_type` is
`TherapyType.THERAPY`, the list will only contain a single therapy.
:param therapy_type: The type of therapy
:param therapy_interaction_type: drug interaction type
:return: Therapy mappable concept
:return: Therapy mappable concept, if ``therapy_type`` is supported
"""
therapy = self._cache.therapies.get(therapy_id)
if therapy:
Expand All @@ -535,7 +531,7 @@ def _add_therapy(
therapy_id, therapies, therapy_interaction_type
)
else:
# not supported
logger.debug("Therapy type is not supported: %s", therapy_type)
return None

self._cache.therapies[therapy_id] = therapy
Expand Down Expand Up @@ -585,8 +581,6 @@ def _add_merged_id_ext(
mappings: list[ConceptMapping] = []
attr_name = NORMALIZER_INSTANCE_TO_ATTR[type(normalizer_resp)]
normalizer_resp_obj = getattr(normalizer_resp, attr_name)
if not normalizer_resp_obj:
return mappings

normalizer_mappings = normalizer_resp_obj.mappings or []
if isinstance(normalizer_resp, NormalizedDisease):
Expand Down
47 changes: 18 additions & 29 deletions src/metakb/transformers/civic.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
MethodId,
TherapyType,
Transformer,
_Cache,
_TransformedRecordsCache,
)

_logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -154,7 +154,7 @@ class SourcePrefix(str, Enum):
ASH = "ASH"


class _CivicCache(_Cache):
class _CivicTransformedCache(_TransformedRecordsCache):
"""Create model for caching CIViC data"""

variations: ClassVar[dict[str, _VariationCache]] = {}
Expand Down Expand Up @@ -192,7 +192,7 @@ def __init__(
self.processed_data.methods = [
self.methods_mapping[MethodId.CIVIC_EID_SOP.value]
]
self._cache = _CivicCache()
self._cache = _CivicTransformedCache()

@staticmethod
def _mp_to_variant_mapping(molecular_profiles: list[dict]) -> tuple[list, dict]:
Expand Down Expand Up @@ -274,13 +274,10 @@ async def transform(self, harvested_data: CivicHarvestedData) -> None:
await self._add_variations(variants)
self._add_genes(harvested_data.genes)

# Only want to add MPs where variation-normalizer succeeded for the related
# variant. Will update `categorical_variants`
able_to_normalize_vids = self._cache.variations.keys()
mps = [
mp
for mp in molecular_profiles
if f"civic.vid:{mp['variant_ids'][0]}" in able_to_normalize_vids
if f"civic.vid:{mp['variant_ids'][0]}" in self._cache.variations
]
self._add_categorical_variants(mps, mp_id_to_v_id_mapping)

Expand All @@ -300,8 +297,7 @@ def _add_variant_study_stmt(
"""Create Variant Study Statement given CIViC Evidence Items.
Will add associated values to ``processed_data`` instance variable
(``therapies``, ``conditions``, and ``documents``).
``_cache`` and ``unable_to_normalize`` will also be mutated for
associated therapies and conditions.
``_cache`` will also be mutated for associated therapies and conditions.
:param record: CIViC Evidence Item or Assertion
:param mp_id_to_v_id_mapping: Molecular Profile ID to Variant ID mapping
Expand Down Expand Up @@ -506,8 +502,7 @@ def _add_categorical_variants(
``processed_data.categorical_variants``.
:param molecular_profiles: List of supported Molecular Profiles in CIViC.
The associated, single variant record for each MP was successfully
normalized
The associated, single variant record for each MP
:param mp_id_to_v_id_mapping: Mapping from Molecular Profile ID to Variant ID
{mp_id: v_id}
"""
Expand Down Expand Up @@ -557,8 +552,6 @@ def _add_categorical_variants(
allele=civic_variation_data.vrs_variation.root,
)
]
else:
constraints = None

cv = CategoricalVariant(
id=mp_id,
Expand Down Expand Up @@ -620,10 +613,9 @@ def _is_supported_variant_query(variant_name: str, variant_id: int) -> bool:

async def _get_variation_members(self, variant: dict) -> list[Variation] | None:
"""Get members field for variation object. This is the related variant concepts.
For now, we will only do genomic HGVS expressions
:param variant: CIViC Variant record
:return: List containing one VRS variation record for associated genomic HGVS
:return: List containing one VRS variation record for associated HGVS
expression, if variation-normalizer was able to normalize
"""
members = []
Expand Down Expand Up @@ -654,10 +646,10 @@ async def _get_variation_members(self, variant: dict) -> list[Variation] | None:
return members

async def _add_variations(self, variants: list[dict]) -> None:
"""Normalize supported CIViC variant records.
"""Transform supported CIViC variant records.
Mutates instance variables ``_cache.variations`` and
``processed_data.variations``, if the variation-normalizer can successfully
normalize the variant
``processed_data.variations``
:param variants: List of all CIViC variant records
"""
Expand Down Expand Up @@ -815,9 +807,8 @@ def _get_expressions(self, variant: dict) -> list[Expression]:

def _add_genes(self, genes: list[dict]) -> None:
"""Create gene objects for all CIViC gene records.
Mutates instance variables ``_cache.genes`` and
``processed_data.genes``, if the gene-normalizer can successfully normalize the
gene
Mutates instance variables ``_cache.genes`` and ``processed_data.genes``
:param genes: All genes in CIViC
"""
Expand Down Expand Up @@ -876,12 +867,11 @@ def _add_genes(self, genes: list[dict]) -> None:
def _add_disease(self, disease: dict) -> MappableConcept:
"""Create or get disease given CIViC disease.
First looks in cache for existing disease, if not found will attempt to
normalize. Will add CIViC disease ID to ``processed_data.conditions`` and
``_cache.conditions`` if disease-normalizer is able to normalize.
Else will add the CIViC disease ID to ``unable_to_normalize['conditions']``
transform. Will add CIViC disease ID to ``processed_data.conditions`` and
``_cache.conditions``
:param disease: CIViC Disease object
:return:
:return: Disease represented as mappable concept
"""
disease_id = f"civic.did:{disease['id']}"
civic_disease = self._cache.conditions.get(disease_id)
Expand All @@ -897,7 +887,7 @@ def _get_disease(self, disease: dict) -> MappableConcept:
"""Get Disease object for a CIViC disease
:param disease: CIViC disease record
:return:
:return: Disease represented as a mappable concept
"""
disease_id = f"civic.did:{disease['id']}"
display_name = disease["display_name"]
Expand Down Expand Up @@ -960,8 +950,7 @@ def _get_therapeutic_substitute_group(
:param therapeutic_sub_group_id: ID for Therapeutic Substitute Group
:param therapies_in: List of CIViC therapy objects
:param therapy_interaction_type: Therapy interaction type provided by CIViC
:return: If able to normalize all therapy objects in `therapies`, returns
Therapeutic Substitute Group
:return: Therapeutic Substitute Group
"""
therapies = []

Expand Down Expand Up @@ -1007,7 +996,7 @@ def _get_therapy(self, therapy_id: str, therapy: dict) -> MappableConcept:
:param therapy_id: ID for therapy
:param therapy: CIViC therapy object
:return:
:return: Therapy represented as a mappable concept
"""
label = therapy["name"]
ncit_id = f"ncit:{therapy['ncit_id']}"
Expand Down
37 changes: 17 additions & 20 deletions src/metakb/transformers/moa.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,14 @@
MoaEvidenceLevel,
TherapyType,
Transformer,
_Cache,
_sanitize_name,
_TransformedRecordsCache,
)

logger = logging.getLogger(__name__)


class _MoaCache(_Cache):
class _MoaTransformedCache(_TransformedRecordsCache):
"""Create model for caching MOA data"""

variations: ClassVar[dict[str, dict]] = {}
Expand Down Expand Up @@ -76,7 +76,7 @@ def __init__(
self.processed_data.methods = [
self.methods_mapping[MethodId.MOA_ASSERTION_BIORXIV.value]
]
self._cache = _MoaCache()
self._cache = _MoaTransformedCache()

async def transform(self, harvested_data: MoaHarvestedData) -> None:
"""Transform MOA harvested JSON to common data model. Will store transformed
Expand All @@ -98,8 +98,7 @@ async def _add_variant_study_stmt(self, assertion: dict) -> None:
"""Create Variant Study Statements from MOA assertions.
Will add associated values to ``processed_data`` instance variable
(``therapies``, ``conditions``, and ``statements``).
``_cache`` and ``unable_to_normalize`` will
also be mutated for associated therapies and conditions.
``_cache`` will also be mutated for associated therapies and conditions.
:param assertions: MOA assertion record
"""
Expand Down Expand Up @@ -207,9 +206,9 @@ async def _add_variant_study_stmt(self, assertion: dict) -> None:

async def _add_categorical_variants(self, variants: list[dict]) -> None:
"""Create Categorical Variant objects for all MOA variant records.
Mutates instance variables ``_cache['variations']`` and
``processed_data.variations``, if the variation-normalizer can successfully
normalize the variant
``processed_data.variations``
:param variants: All variants in MOAlmanac
"""
Expand Down Expand Up @@ -334,7 +333,8 @@ async def _get_variation_members(
self, moa_rep_coord: dict
) -> list[Variation] | None:
"""Get members field for variation object. This is the related variant concepts.
FOr now, only looks at genomic representative coordinate.
For now, only looks at genomic representative coordinate.
:param moa_rep_coord: MOA Representative Coordinate
:return: List containing one VRS variation record for associated genomic
Expand Down Expand Up @@ -375,9 +375,8 @@ async def _get_variation_members(

def _add_genes(self, genes: list[str]) -> None:
"""Create gene objects for all MOA gene records.
Mutates instance variables ``_cache['genes']`` and
``processed_data.genes``, if the gene-normalizer can successfully normalize the
gene
Mutates instance variables ``_cache['genes']`` and ``processed_data.genes``
:param genes: All genes in MOAlmanac
"""
Expand Down Expand Up @@ -448,7 +447,7 @@ def _get_therapy_or_group(
"""Get therapy mappable concept (single) or therapy group (multiple)
:param assertion: MOA assertion record
:return: Therapy object, if found and able to be normalized
:return: Therapy object represented as a mappable concept or therapy group
"""
therapy = assertion["therapy"]
therapy_name = therapy["name"]
Expand Down Expand Up @@ -509,7 +508,7 @@ def _get_therapy(self, therapy_id: str, therapy: dict) -> MappableConcept:
:param therapy_id: Generated therapy ID
:param therapy: MOA therapy name
:return: If able to normalize therapy
:return: Therapy represented as a mappable concept
"""
mappings = []
extensions = []
Expand Down Expand Up @@ -549,18 +548,16 @@ def _add_disease(self, disease: dict) -> MappableConcept | None:
"""Create or get disease given MOA disease.
First looks in cache for existing disease, if not found will attempt to
normalize. Will generate a digest from the original MOA disease object oncotree
fields. This will be used as the key in the caches. Will add the generated digest
to ``processed_data.conditions`` and ``_cache['conditions']`` if
disease-normalizer is able to normalize. Else will add the generated digest to
``unable_to_normalize['conditions']``.
transform. Will generate a digest from the original MOA disease object oncotree
fields. This will be used as the key in the caches. Will add the generated
digest to ``processed_data.conditions`` and ``_cache['conditions']``.
Since there may be duplicate Oncotree code/terms with different names, the first
name will be used as the Disease label. Others will be added to the extensions
aliases field.
:param disease: MOA disease object
:return: Disease object
:return: Disease represented as a mappable concept
"""
if not all(value for value in disease.values()):
return None
Expand Down Expand Up @@ -604,7 +601,7 @@ def _get_disease(self, disease: dict) -> MappableConcept:
"""Get Disease object for a MOA disease
:param disease: MOA disease record
:return: If able to normalize, Disease
:return: Disease represented as a mappable concept
"""
queries = []
mappings = []
Expand Down

0 comments on commit 566bf3a

Please sign in to comment.