From 1bbfdd325839fbeb6117f80614a5fd1cf0fadaed Mon Sep 17 00:00:00 2001 From: cb-Hades <81743695+cb-Hades@users.noreply.github.com> Date: Fri, 9 Aug 2024 17:03:44 +0200 Subject: [PATCH] Update gapfill #126 #52 Added functions for adding reac / metabs per database id --- dev/gapfill-testing.ipynb | 1408 +++++---------------- src/refinegems/__init__.py | 3 +- src/refinegems/classes/gapfill.py | 107 +- src/refinegems/curation/db_access/db.py | 116 +- src/refinegems/curation/db_access/kegg.py | 2 +- src/refinegems/developement/__init__.py | 3 + src/refinegems/developement/decorators.py | 37 + src/refinegems/utility/entities.py | 1287 ++++++++++++++++++- 8 files changed, 1813 insertions(+), 1150 deletions(-) create mode 100644 src/refinegems/developement/__init__.py create mode 100644 src/refinegems/developement/decorators.py diff --git a/dev/gapfill-testing.ipynb b/dev/gapfill-testing.ipynb index a402959..3330b85 100644 --- a/dev/gapfill-testing.ipynb +++ b/dev/gapfill-testing.ipynb @@ -57,10 +57,109 @@ }, { "cell_type": "code", - "execution_count": null, + "execution_count": 149, "metadata": {}, - "outputs": [], - "source": [] + "outputs": [ + { + "data": { + "text/html": [ + "
\n", + "\n", + "\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "
ncbiproteinlocus_tagec-codeUniProt
303WP_011274811.1SH04863.2.-.-[Q4L980]
304WP_011274812.1SH04872.3.1.-[Q4L979]
305WP_011274813.1SH04881.14.99.-[Q4L978]
306WP_011274814.1SH04892.4.1.-[Q4L977]
307WP_011274815.1SH04902.5.1.96[Q4L976]
308WP_080004924.1SH04911.3.8.-[Q4L975]
312WP_011274824.1SH04992.3.1.-[Q4L967]
\n", + "
" + ], + "text/plain": [ + " ncbiprotein locus_tag ec-code UniProt\n", + "303 WP_011274811.1 SH0486 3.2.-.- [Q4L980]\n", + "304 WP_011274812.1 SH0487 2.3.1.- [Q4L979]\n", + "305 WP_011274813.1 SH0488 1.14.99.- [Q4L978]\n", + "306 WP_011274814.1 SH0489 2.4.1.- [Q4L977]\n", + "307 WP_011274815.1 SH0490 2.5.1.96 [Q4L976]\n", + "308 WP_080004924.1 SH0491 1.3.8.- [Q4L975]\n", + "312 WP_011274824.1 SH0499 2.3.1.- [Q4L967]" + ] + }, + "execution_count": 149, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "mapped_res[0]" + ] }, { "cell_type": "markdown", @@ -95,124 +194,12 @@ "import re\n", "from refinegems.utility.cvterms import add_cv_term_genes\n", "from libsbml import FbcOr, FbcAnd, GeneProductRef\n", + "from libsbml import Model as libModel\n", "import warnings\n", + "import libsbml\n", + "from typing import Union,List\n", "\n", "\n", - "# @TODO merge with the function of the same name in entities, if possible\n", - "# or just use them separatly \n", - "# @TODO generalise addition of references -> maybe kwargs\n", - "# @TODO\n", - "# what to do about the name\n", - "def create_gp(model, protein_id, \n", - " name:str=None, locus_tag:str=None,\n", - " uniprot:tuple[str,bool]=None):\n", - " \n", - " # create gene product object\n", - " gp = model.getPlugin(0).createGeneProduct()\n", - " # set basic attributes\n", - " geneid = f'G_{protein_id}'.replace('.','_') # remove problematic signs\n", - " gp.setIdAttribute(geneid) # ID \n", - " if name: gp.setName(name) # Name \n", - " if locus_tag: gp.setLabel(locus_tag) # Label\n", - " gp.setSBOTerm('SBO:0000243') # SBOterm\n", - " gp.setMetaId(f'meta_G_{protein_id}') # Meta ID\n", - " # test for NCBI/RefSeq\n", - " if re.fullmatch('^(((AC|AP|NC|NG|NM|NP|NR|NT|NW|WP|XM|XP|XR|YP|ZP)_\\d+)|(NZ_[A-Z]{2,4}\\d+))(\\.\\d+)?$', protein_id, re.IGNORECASE):\n", - " id_db = 'REFSEQ'\n", - " elif re.fullmatch('^(\\w+\\d+(\\.\\d+)?)|(NP_\\d+)$', protein_id, re.IGNORECASE): id_db = 'NCBI'\n", - " if id_db: add_cv_term_genes(protein_id, id_db, gp) # NCBI protein\n", - " # add further references\n", - " # @TODO extend or generalise\n", - " if uniprot:\n", - " for uniprotid in uniprot[0]:\n", - " add_cv_term_genes(uniprotid, 'UNIPROT', gp, uniprot[1]) # UniProt\n", - " \n", - " \n", - "# probably sort into GapFiller\n", - "def add_genes_from_table(model, gene_table:pd.DataFrame):\n", - " \n", - " # ncbiprotein | locus_tag | ec-code | ...\n", - " # work on a copy to ensure input stays the same\n", - " gene_table = gene_table.copy()\n", - " gene_table.drop(columns=['ec-code'],inplace=True)\n", - " \n", - " # create gps from the table and add them to the model\n", - " for idx,x in gene_table.iterrows():\n", - " create_gp(model, x['ncbiprotein'], \n", - " locus_tag=x['locus_tag'],\n", - " uniprot=(x['UniProt'],True))\n", - " \n", - "\n", - "# @TODO : does it cover indeed all cases\n", - "# Where to sort it -> entities?\n", - "def create_gpr(reaction,gene):\n", - "\n", - " # Step 1: test, if there is already a gpr\n", - " # ---------------------------------------\n", - " old_association_str = None\n", - " old_association_fbc = None\n", - " if reaction.getPlugin(0).getGeneProductAssociation():\n", - " old_association = reaction.getPlugin(0).getGeneProductAssociation().getListOfAllElements()\n", - " # case 1: only a single association\n", - " if len(old_association) == 1 and isinstance(old_association[0],GeneProductRef):\n", - " old_association_str = old_association[0].getGeneProduct()\n", - " # case 2: nested structure of asociations\n", - " elif isinstance(old_association[0], FbcOr) or isinstance(old_association[0], FbcAnd):\n", - " old_association_fbc = old_association[0].clone()\n", - " # this should get the highest level association (that includes all others)\n", - "\n", - " \n", - " # Step 2: create new gene product association \n", - " # -------------------------------------------\n", - " if old_association_str and isinstance(gene,str):\n", - " gene = [old_association_str,id]\n", - " elif old_association_str and isinstance(gene,list):\n", - " gene.append(old_association_str)\n", - " \n", - " # add the old association rule as an 'OR' (if needed)\n", - " if not old_association_fbc:\n", - " new_association = reaction.getPlugin(0).createGeneProductAssociation()\n", - " else:\n", - " new_association = reaction.getPlugin(0).createGeneProductAssociation().createOr()\n", - " new_association.addAssociation(old_association_fbc)\n", - "\n", - " # add the remaining genes \n", - " # @TODO currently, only connection possible is 'OR'\n", - " if isinstance(gene,str):\n", - " new_association.createGeneProductRef().setGeneProduct(gene)\n", - " elif isinstance(gene,list) and len(gene) == 1:\n", - " new_association.createGeneProductRef().setGeneProduct(gene[0])\n", - " elif isinstance(gene,list) and len(gene) > 1:\n", - " gpa_or = new_association.createOr()\n", - " for i in gene:\n", - " gpa_or.createGeneProductRef().setGeneProduct(i)\n", - " \n", - "\n", - "# @TODO seems very ridgid, beter ways to find the ids?\n", - "# probably sort into GapFiller\n", - "def add_gene_reac_associations_from_table(model,reac_table:pd.DataFrame):\n", - " \n", - " model_gene_ids = [_.getId() for _ in model.getPlugin(0).getListOfGeneProducts()]\n", - " \n", - " # get each unique ncbiprotein vs reaction mapping\n", - " reac_table = reac_table[['ncbiprotein','add_to_GPR']]\n", - " reac_table = reac_table.explode('ncbiprotein').explode('add_to_GPR')\n", - " reac_table.drop_duplicates(inplace=True)\n", - " \n", - " # add the genes to the corresponding GPRs\n", - " for idx,row in reac_table.iterrows():\n", - " # check, if G_+ncbiprotein in model\n", - " # if yes, add gpr\n", - " geneid = 'G_'+row['ncbiprotein'].replace('.','_')\n", - " reacid = 'R_'+row['add_to_GPR']\n", - " if geneid in model_gene_ids:\n", - " create_gpr(model.getReaction(reacid),geneid)\n", - " # else, print warning\n", - " else:\n", - " mes = f'Cannot find {geneid} in model. Should be added to {reacid}'\n", - " warnings.warn(mes,UserWarning)\n", - " \n", - "\n", "def fill_model(model, missing_genes:pd.DataFrame, \n", " missing_reacs:pd.DataFrame):\n", " \n", @@ -352,155 +339,6 @@ " cobramodel = load_model(tmp.name,'cobra')" ] }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "#### Parse reaction string" - ] - }, - { - "cell_type": "code", - "execution_count": 123, - "metadata": {}, - "outputs": [ - { - "data": { - "text/plain": [ - "({'C00024': 1.0, 'C00025': 1.0}, {'C00010': 1.0, 'C00624': 1.0}, None, True)" - ] - }, - "execution_count": 123, - "metadata": {}, - "output_type": "execute_result" - } - ], - "source": [ - "\n", - "equation = '1 MNXM1100221@MNXD1 + 1 MNXM735047@MNXD1 + 1 MNXM9@MNXD1 = 1 MNXM1100222@MNXD1 + 1 MNXM286@MNXD1'\n", - "equation2 = 'aspsa_c + nadp_c + pi_c <-> 4pasp_c + h_c + nadph_c'\n", - "equation3 = 'C00024 + C00025 <=> C00010 + C00624'\n", - "\n", - "# @TODO: BioCyc missing\n", - "def parse_reac_str(equation, type='MetaNetX'):\n", - "\n", - " products = {}\n", - " reactants = {}\n", - " compartments = list()\n", - " is_product = False\n", - " reversible = True\n", - "\n", - " match type:\n", - " case 'MetaNetX':\n", - " for s in equation.split(' '):\n", - " # switch from reactants to products\n", - " if s == '=':\n", - " is_product = True\n", - " # found stoichiometric factor\n", - " elif s.isnumeric():\n", - " factor = float(s)\n", - " # skip\n", - " elif s == '+':\n", - " continue\n", - " # found metabolite\n", - " else:\n", - " # get information from MetaNetX\n", - " metabolite, compartment = s.split('@')\n", - " compartments.append(compartment)\n", - " \n", - " if is_product:\n", - " products[metabolite] = factor\n", - " else:\n", - " reactants[metabolite] = factor\n", - " \n", - " case 'BiGG':\n", - " factor = 1.0 # BiGG does not use factor 1 in the quations\n", - " for s in equation.split(' '):\n", - " # found factor\n", - " if s.replace('.','').isdigit():\n", - " factor = float(s)\n", - " # switch from reactants to products\n", - " elif s == '-->' :\n", - " is_product = True\n", - " reversible = False\n", - " elif s == '<->':\n", - " is_product = True\n", - " # skip\n", - " elif s == '+':\n", - " continue\n", - " # found metabolite\n", - " else:\n", - " compartments.append(s.rsplit('_',1)[1])\n", - " if is_product:\n", - " products[s] = factor\n", - " else:\n", - " reactants[s] = factor\n", - " factor = 1.0\n", - " \n", - " case 'KEGG':\n", - " compartments = None\n", - " factor = 1.0\n", - " for s in equation.split(' '):\n", - " if s.isnumeric():\n", - " factor = float(s)\n", - " elif s == '+':\n", - " continue\n", - " elif s == '<=>': # @TODO are there more options?\n", - " is_product = True\n", - " else:\n", - " if is_product:\n", - " products[s] = factor\n", - " else:\n", - " reactants[s] = factor\n", - " factor = 1.0\n", - " \n", - " case 'BioCyc':\n", - " pass\n", - " \n", - " return (reactants,products,compartments,reversible)\n", - " \n", - " \n", - " \n", - "parse_reac_str(equation3,'KEGG')" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "#### check, if a reaction should be added or not" - ] - }, - { - "cell_type": "code", - "execution_count": 21, - "metadata": {}, - "outputs": [], - "source": [ - "# originally from SPECIMEN HQTB\n", - "# @TODO\n", - "def isreaction_complete(reac:cobra.Reaction, \n", - " exclude_dna:bool=True, exclude_rna:bool=True) -> bool:\n", - "\n", - " # check reaction\n", - " if exclude_dna and 'DNA' in reac.name:\n", - " return False\n", - " if exclude_rna and 'RNA' in reac.name:\n", - " return False\n", - "\n", - " # check metabolites\n", - " for m in reac.metabolites:\n", - " if m.id == '' or pd.isnull(m.id):\n", - " return False\n", - " if m.name == '' or pd.isnull(m.name):\n", - " return False\n", - " if m.formula == '' or pd.isnull(m.formula):\n", - " return False\n", - "\n", - " return True\n", - "\n" - ] - }, { "cell_type": "markdown", "metadata": {}, @@ -510,886 +348,232 @@ }, { "cell_type": "code", - "execution_count": 23, - "metadata": {}, - "outputs": [], - "source": [ - "# some decorators\n", - "def template(func):\n", - " def wrapper():\n", - " print('This function is a template for developers.\\nThis cannot be executed.')\n", - " return wrapper\n", - "\n", - "def implement(func):\n", - " def wrapper():\n", - " print('The current function is just a placeholder and will be implement in the fucture.')\n", - " return wrapper" - ] - }, - { - "cell_type": "code", - "execution_count": 124, - "metadata": {}, - "outputs": [], - "source": [ - "from refinegems.utility.io import load_a_table_from_database\n", - "from refinegems.utility.entities import create_random_id, match_id_to_namespace\n", - "import cobra\n", - "import pandas as pd\n", - "from typing import Literal\n", - "from Bio.KEGG import REST, Compound\n", - "import urllib\n", - "import requests \n", - "import json\n", - "\n", - "# @TODO : name is an issue\n", - "def get_BiGG_metabs_annotation_via_dbid(metabolite, id, dbcol, compartment):\n", - " if not 'bigg.metabolite' in metabolite.annotation.keys():\n", - " bigg_search = load_a_table_from_database(\n", - " f'SELECT * FROM bigg_metabolites WHERE \\'{dbcol}\\' = \\'{id}\\'',\n", - " query=True)\n", - " if len(bigg_search) > 0:\n", - " metabolite.annotation['bigg.metabolite'] = [_ for _ in bigg_search['id'].tolist() if _.endswith(f'_{compartment}')]\n", - " if len(metabolite.annotation['bigg.metabolite']) == 0:\n", - " metabolite.annotation.pop('bigg.metabolite')\n", - "\n", - "\n", - "def add_annotations_from_BiGG_metabs(metabolite:cobra.Metabolite):\n", - " if 'bigg.metabolite' in metabolite.annotation.keys():\n", - " bigg_information = load_a_table_from_database(\n", - " 'SELECT * FROM bigg_metabolites WHERE id = \\'' + f'\\' OR id = \\''.join(metabolite.annotation['bigg.metabolite']) + '\\'',\n", - " query=True)\n", - " db_id_bigg = {'BioCyc':'biocyc', 'MetaNetX (MNX) Chemical':'metanetx.chemical','SEED Compound':'seed.compound','CHEBI':'chebi', 'KEGG Compound':'kegg.compound'}\n", - " for db in db_id_bigg:\n", - " info = list(set(bigg_information[db].dropna().to_list()))\n", - " if len(info) > 0:\n", - " info = ','.join(info)\n", - " info = [x.strip() for x in info.split(',')] # make sure all entries are a separate list object\n", - " if db_id_bigg[db] in metabolite.annotation.keys():\n", - " metabolite.annotation[db_id_bigg[db]] = list(set(info + metabolite.annotation[db_id_bigg[db]]))\n", - " else:\n", - " metabolite.annotation[db_id_bigg[db]] = info\n", - "\n", - "\n", - "@template\n", - "def build_metabolite_xxx(id:str, model:cobra.Model, \n", - " namespace:str,\n", - " compartment:str,\n", - " idprefix:str) -> cobra.Metabolite: \n", - " # check if id in model\n", - " # get information via id\n", - " # collection formation in a new metabolite object\n", - " # add more annotations from other databases\n", - " # adjust namespace\n", - " # check model again for new namespace\n", - " pass\n", - "\n", - "# originally from SPECIMEN\n", - "# @TODO some issues left\n", - "# current version works on a couple of examples \n", - "def build_metabolite_mnx(id: str, model:cobra.Model, \n", - " namespace:str='BiGG',\n", - " compartment:str='c',\n", - " idprefix:str='refineGEMs') -> cobra.Metabolite | None:\n", - "\n", - " # fast check if compound already in model\n", - " # ------------------------------------------\n", - " # step 1: check if MetaNetX ID in model\n", - " matches = [x.id for x in model.metabolites if 'metanetx.chemical' in x.annotation and x.annotation['metanetx.chemical']==id and x.compartment == compartment]\n", - "\n", - " # step 2: if yes, retrieve metabolite from model\n", - " # case 1: multiple matches found\n", - " if len(matches) > 0:\n", - " if len(matches) > 1:\n", - " # ................\n", - " # @TODO what to do\n", - " # currently, just the first one is taken\n", - " # ................\n", - " match = model.metabolites.get_by_id(matches[0])\n", - " # case 2: only one match found\n", - " else:\n", - " match = model.metabolites.get_by_id(matches[0])\n", - "\n", - " # step 3: add metabolite\n", - " return match\n", - "\n", - " # if not, create new metabolite\n", - " # -----------------------------\n", - " metabolite_prop = load_a_table_from_database(f'SELECT * FROM mnx_chem_prop WHERE id = \\'{id}\\'')\n", - " metabolite_anno = load_a_table_from_database(f'SELECT * FROM mnx_chem_xref WHERE id = \\'{id}\\'')\n", - " if len(metabolite_prop) == 0: # cannot construct metabolite\n", - " return None\n", - " else:\n", - " \n", - " # step 1: create a random metabolite ID\n", - " new_metabolite = cobra.Metabolite(create_random_id(model, 'meta', idprefix)) \n", - "\n", - " # step 2: add features\n", - " # --------------------\n", - " new_metabolite.formula = metabolite_prop['formula'].iloc[0]\n", - " new_metabolite.name = metabolite_prop['name'].iloc[0]\n", - " new_metabolite.charge = metabolite_prop['charge'].iloc[0]\n", - " new_metabolite.compartment = compartment\n", - "\n", - " # step 3: add notes\n", - " # -----------------\n", - " new_metabolite.notes['created with'] = 'refineGEMs GapFiller, metanetx.chemical'\n", - "\n", - " # step 4: add annotations\n", - " # -----------------------\n", - " # add SBOTerm\n", - " new_metabolite.annotation['sbo'] = 'SBO:0000247'\n", - " \n", - " # add information directly available from the mnx_chem_prop table \n", - " new_metabolite.annotation['metanetx.chemical'] = [metabolite_prop['id'].iloc[0]]\n", - " if not pd.isnull(metabolite_prop['InChIKey'].iloc[0]):\n", - " new_metabolite.annotation['inchikey'] = metabolite_prop['InChIKey'].iloc[0].split('=')[1]\n", - " \n", - " # get more annotation from the mnx_chem_xref table\n", - " for db in ['kegg.compound','metacyc.compound','seed.compound','bigg.metabolite','chebi']:\n", - " db_matches = metabolite_anno[metabolite_anno['source'].str.contains(db)]\n", - " if len(db_matches) > 0:\n", - " new_metabolite.annotation[db] = [m.split(':',1)[1] for m in db_matches['source'].tolist()]\n", - "\n", - " # Cleanup BiGG annotations (MetaNetX only saves universal)\n", - " # @TODO : there is no guarantee, that the id with the specific compartment actually exists -> still do it? // kepp the universal id?\n", - " new_metabolite.annotation['bigg.metabolite'] = [_+'_'+compartment for _ in new_metabolite.annotation['bigg.metabolite']]\n", - " # if no BiGG was found in MetaNetX, try reverse search in BiGG\n", - " get_BiGG_metabs_annotation_via_dbid(new_metabolite, id, 'MetaNetX (MNX) Chemical', compartment)\n", - " \n", - " # add additional information from BiGG (if ID found) \n", - " add_annotations_from_BiGG_metabs(new_metabolite)\n", - "\n", - " # step 5: change ID according to namespace\n", - " # ----------------------------------------\n", - " match_id_to_namespace(new_metabolite,namespace)\n", - " \n", - " # step 6: re-check existence of ID in model\n", - " # -----------------------------------------\n", - " # @TODO : check complete annotations? \n", - " # - or let those be covered by the duplicate check later on?\n", - " if new_metabolite.id in [_.id for _ in model.metabolites]:\n", - " return model.metabolites.get_by_id(new_metabolite.id)\n", - " \n", - " return new_metabolite\n", - "\n", - "\n", - "# originally from SPECIMEN\n", - "# @TODO some issues left\n", - "# current version works on a couple of examples \n", - "def build_metabolite_kegg(kegg_id:str, model:cobra.Model, \n", - " namespace:Literal['BiGG']='BiGG', \n", - " compartment:str='c',\n", - " idprefix='refineGEMs') -> cobra.Metabolite | None:\n", - "\n", - " \n", - " # ---------------------------------------\n", - " # fast check if compound already in model\n", - " # ---------------------------------------\n", - " # step 1: check via KEGG ID\n", - " matches = [x.id for x in model.metabolites if ('kegg.compound' in x.annotation and x.annotation['kegg.compound'] == kegg_id)]\n", - " if len(matches) > 0:\n", - " # step 2: model id --> metabolite object\n", - " # case 1: multiple matches found\n", - " if len(matches) > 1:\n", - " # .......\n", - " # @TODO\n", - " # .......\n", - " match = model.metabolites.get_by_id(matches[0])\n", - " # case 2: only one match found\n", - " else:\n", - " match = model.metabolites.get_by_id(matches[0])\n", - "\n", - " # step 3: add metabolite\n", - " return match\n", - "\n", - " # -----------------------------\n", - " # if not, create new metabolite\n", - " # -----------------------------\n", - " \n", - " # step 1: retrieve KEGG entry for compound\n", - " # ----------------------------------------\n", - " try:\n", - " kegg_handle = REST.kegg_get(kegg_id)\n", - " kegg_record = [r for r in Compound.parse(kegg_handle)][0]\n", - " except urllib.error.HTTPError:\n", - " warnings.warn(F'HTTPError: {kegg_id}')\n", - " return None\n", - " except ConnectionResetError:\n", - " warnings.warn(F'ConnectionResetError: {kegg_id}')\n", - " return None\n", - " except urllib.error.URLError:\n", - " warnings.warn(F'URLError: {kegg_id}')\n", - " return None\n", - "\n", - " # step 2: create a random metabolite ID\n", - " # -------------------------------------\n", - " new_metabolite = cobra.Metabolite(create_random_id(model, 'meta',idprefix)) \n", - "\n", - " # step 3: add features\n", - " # --------------------\n", - " # set name from KEGG and additionally use it as ID if there is none yet\n", - " if isinstance(kegg_record.name, list):\n", - " # @TODO : better way to choose a name than to just take the first entry???\n", - " new_metabolite.name = kegg_record.name[0]\n", - " else:\n", - " new_metabolite.name = kegg_record.name\n", - " # set compartment\n", - " new_metabolite.compartment = compartment\n", - " # set formula\n", - " new_metabolite.formula = kegg_record.formula\n", - "\n", - " # step 4: add notes\n", - " # -----------------\n", - " new_metabolite.notes['created with'] = 'refineGEMs GapFiller, KEGG.compound'\n", - "\n", - " # step 5: add annotations\n", - " # -----------------------\n", - " # add annotation from the KEGG entry\n", - " new_metabolite.annotation['kegg.compound'] = kegg_id\n", - " db_idtf = {'CAS':'cas','PubChem':'pubchem.compound','ChEBI':'chebi'}\n", - " for db,ids in kegg_record.dblinks:\n", - " if db in db_idtf:\n", - " new_metabolite.annotation[db_idtf[db]] = ids\n", - " \n", - " # add SBOTerm\n", - " new_metabolite.annotation['sbo'] = 'SBO:0000247'\n", - "\n", - " # search for infos in MetaNetX\n", - " # @TODO, since the table are readily available at the database now\n", - " mnx_info = load_a_table_from_database(\n", - " f'SELECT * FROM mnx_chem_xref WHERE source = \\'kegg.compound:{kegg_id}\\'',\n", - " query=True\n", - " )\n", - " if len(mnx_info) > 0:\n", - " mnx_ids = list(set(mnx_info['id']))\n", - " # mapping is unambiguously\n", - " if len(mnx_ids) == 1:\n", - " mnx_info = load_a_table_from_database(\n", - " f'SELECT * FROM mnx_chem_prop WHERE id = \\'{mnx_ids[0]}\\'',\n", - " query=True\n", - " )\n", - " # add charge \n", - " new_metabolite.charge = mnx_info['charge'].iloc[0]\n", - " # add more annotations\n", - " new_metabolite.annotation['metanetx.chemical'] = [mnx_info['id'].iloc[0]]\n", - " if not pd.isnull(mnx_info['InChIKey'].iloc[0]):\n", - " new_metabolite.annotation['inchikey'] = mnx_info['InChIKey'].iloc[0].split('=')[1]\n", - " \n", - " # get more annotation from the mnx_chem_xref table \n", - " metabolite_anno = load_a_table_from_database(f'SELECT * FROM mnx_chem_xref WHERE id = \\'{mnx_info[\"id\"]}\\'')\n", - " for db in ['kegg.compound','metacyc.compound','seed.compound','bigg.metabolite','chebi']:\n", - " db_matches = metabolite_anno[metabolite_anno['source'].str.contains(db)]\n", - " if len(db_matches) > 0:\n", - " mnx_tmp = [m.split(':',1)[1] for m in db_matches['source'].tolist()]\n", - " if db in new_metabolite.annotation.keys():\n", - " new_metabolite.annotation[db] = list(set(mnx_tmp + new_metabolite.annotation[db]))\n", - " else:\n", - " new_metabolite.annotation[db] = mnx_tmp\n", - "\n", - " else:\n", - " pass\n", - " # @TODO : how to handle multiple matches, e.g. getting charge will be complicated\n", - " \n", - " # Cleanup BiGG annotations (MetaNetX only saves universal)\n", - " # @TODO : there is no guarantee, that the id with the specific compartment actually exists -> still do it? // kepp the universal id?\n", - " if 'bigg.metabolite' in new_metabolite.annotation.keys():\n", - " new_metabolite.annotation['bigg.metabolite'] = [_+'_'+compartment for _ in new_metabolite.annotation['bigg.metabolite']]\n", - " \n", - " # if no BiGG ID, try reverse search\n", - " get_BiGG_metabs_annotation_via_dbid(new_metabolite, id, 'KEGG Compound', compartment)\n", - " \n", - " # search for annotations in BiGG\n", - " add_annotations_from_BiGG_metabs(new_metabolite)\n", - "\n", - " # step 6: change ID according to namespace\n", - " # ----------------------------------------\n", - " match_id_to_namespace(new_metabolite,namespace)\n", - " \n", - " # step 7: re-check existence of ID in model\n", - " # -----------------------------------------\n", - " # @TODO : check complete annotations? \n", - " # - or let those be covered by the duplicate check later on?\n", - " if new_metabolite.id in [_.id for _ in model.metabolites]:\n", - " return model.metabolites.get_by_id(new_metabolite.id)\n", - "\n", - " return new_metabolite\n", - "\n", - "\n", - "# @TEST some more, somewhat works, but who knows...\n", - "# @TODO some comments inside\n", - "# @NOTE expects the non-universal BiGG ID (meaning the one with the compartment abbreviation\n", - "# at the end) -> change behaviour or keep it?\n", - "def build_metabolite_bigg(id:str, model:cobra.Model, \n", - " namespace:Literal['BiGG']='BiGG',\n", - " idprefix:str='refineGEMs') -> cobra.Metabolite: \n", - " \n", - " compartment = id.rsplit('_',1)[1]\n", - " # ------------------------------------------\n", - " # fast check if compound already in model\n", - " # ------------------------------------------\n", - " # step 1: check if MetaNetX ID in model\n", - " matches = [x.id for x in model.metabolites if 'bigg.metabolite' in x.annotation and (x.annotation['bigg.metabolite']==id or x.annotation['bigg.metabolite']==id.rsplit('_',1)[0]) and x.compartment == compartment]\n", - " # step 2: if yes, retrieve metabolite from model\n", - " # case 1: multiple matches found\n", - " if len(matches) > 0:\n", - " if len(matches) > 1:\n", - " # ................\n", - " # @TODO what to do\n", - " # currently, just the first one is taken\n", - " # ................\n", - " match = model.metabolites.get_by_id(matches[0])\n", - " # case 2: only one match found\n", - " else:\n", - " match = model.metabolites.get_by_id(matches[0])\n", - "\n", - " # step 3: add metabolite\n", - " return match\n", - " \n", - " # -----------------------------\n", - " # if not, create new metabolite\n", - " # -----------------------------\n", - " # get information from the database\n", - " bigg_res = load_a_table_from_database(\n", - " f'SELECT * FROM bigg_metabolites WHERE id = \\'{id}\\'',\n", - " query=True)\n", - " if len(bigg_res) > 0:\n", - " bigg_res = bigg_res.iloc[0,:]\n", - " else:\n", - " return None # not data = no recontruction\n", - " # get information from MNX if ID available\n", - " mnx_res=None\n", - " if bigg_res['MetaNetX (MNX) Chemical']:\n", - " mnx_res = load_a_table_from_database(\n", - " f'SELECT * FROM mnx_chem_prop WHERE id=\\'{bigg_res[\"MetaNetX (MNX) Chemical\"]}\\'',\n", - " query=True)\n", - " if len(mnx_res) > 0:\n", - " mnx_res = mnx_res.iloc[0,:]\n", - " else:\n", - " mnx_res=None \n", - " \n", - " # step 1: create a random metabolite ID\n", - " # -------------------------------------\n", - " new_metabolite = cobra.Metabolite(create_random_id(model, 'meta', idprefix)) \n", - " \n", - " # step 2: add features\n", - " # --------------------\n", - " new_metabolite.name = bigg_res['name']\n", - " new_metabolite.compartment = compartment\n", - " if mnx_res is not None:\n", - " if mnx_res['charge']:\n", - " new_metabolite.charge = mnx_res['charge']\n", - " if mnx_res['formula']:\n", - " new_metabolite.formula = mnx_res['formula']\n", - " \n", - " if not new_metabolite.formula or not new_metabolite.charge:\n", - " try:\n", - " bigg_fetch = json.loads(requests.get(f'http://bigg.ucsd.edu/api/v2/universal/metabolites/{id.rsplit(\"_\",1)[0]}').text)\n", - " if 'formulae' in bigg_fetch.keys() and not new_metabolite.formula: \n", - " new_metabolite.formula = bigg_fetch['formulae'][0]\n", - " if 'charges' in bigg_fetch.keys() and new_metabolite.charge: \n", - " new_metabolite.charge = bigg_fetch['charges'][0]\n", - " except Exception as e:\n", - " # @TODO\n", - " pass\n", - " \n", - " # step 3: add notes\n", - " # -----------------\n", - " new_metabolite.notes['created with'] = 'refineGEMs GapFiller, BiGG'\n", - "\n", - " # step 4: add annotations\n", - " # -----------------------\n", - " # add SBOTerm\n", - " new_metabolite.annotation['sbo'] = 'SBO:0000247'\n", - " # add infos from BiGG\n", - " new_metabolite.annotation['bigg.metabolite'] = [id] # @TODO or use the universal id?\n", - " add_annotations_from_BiGG_metabs(new_metabolite)\n", - " # add annotations from MNX\n", - " if mnx_res is not None:\n", - " if mnx_res['InChI'] and 'inchi' not in new_metabolite.annotation.keys():\n", - " new_metabolite.annotation['inchi'] = mnx_res['InChI']\n", - " if mnx_res['InChIKey'] and 'inchikey' not in new_metabolite.annotation.keys():\n", - " new_metabolite.annotation['inchikey'] = mnx_res['InChIKey']\n", - " \n", - " # step 5: change ID according to namespace\n", - " # ----------------------------------------\n", - " match_id_to_namespace(new_metabolite,namespace)\n", - " \n", - " # step 6: re-check existence of ID in model\n", - " # -----------------------------------------\n", - " # @TODO : check complete annotations? \n", - " # - or let those be covered by the duplicate check later on?\n", - " if new_metabolite.id in [_.id for _ in model.metabolites]:\n", - " return model.metabolites.get_by_id(new_metabolite.id)\n", - " \n", - " return new_metabolite\n", - " \n", - "\n", - "@implement\n", - "def build_metabolite_biocyc(id:str, model:cobra.Model, \n", - " namespace:str,\n", - " compartment:str,\n", - " idprefix:str) -> cobra.Metabolite: \n", - " pass" - ] - }, - { - "cell_type": "code", - "execution_count": 129, + "execution_count": 147, "metadata": {}, "outputs": [], "source": [ "from refinegems.curation.db_access.kegg import kegg_reaction_parser\n", - "# general functions\n", - "# -----------------\n", - "\n", - "# TODO\n", - "# extend the build function so, that all of them can take either the id or an equation \n", - "# as input for rebuilding the reaction (would also be beneficial for semi-manual curation)\n", - "\n", - "# NOTE: \n", - "# (final) validation of reaction in all reactions missing\n", - "# --> check reaction for completness etc.\n", - "# --> maybe better to put it outside the build_... functions\n", - "# --> like an extra check for the pipeline, also better for\n", - "# filtering out DNA / RNA reactions if neccessary\n", - "\n", - "def _add_annotations_from_bigg_reac_row(row, reac):\n", - " dbnames = {'RHEA':'rhea','BioCyc':'biocyc','MetaNetX (MNX) Equation':'metanetx.reaction','EC Number':'ec-code'}\n", - " for dbname,dbprefix in dbnames.items():\n", - " if row[dbname]:\n", - " ids_to_add = row[dbname].split(',')\n", - " if dbprefix in reac.annotation.keys():\n", - " reac.annotation[dbprefix] = list(set(reac.annotation[dbprefix]).union(set(ids_to_add)))\n", - " else:\n", - " reac.annotation[dbprefix] = ids_to_add\n", - "\n", - "def _add_annotations_from_dict_cobra(references, entity):\n", - " # add additional references from the parameter\n", - " for db,idlist in references.items():\n", - " if not isinstance(idlist,list):\n", - " idlist = [idlist]\n", - " if db in entity.annotation.keys():\n", - " entity.annotation[db] = list(set(entity.annotation[db] + idlist))\n", - " else:\n", - " entity.annotation[db] = idlist\n", - "\n", - "\n", - "@template\n", - "def build_reaction_xxx():\n", - " pass\n", - "\n", - "\n", - "# @TEST (more) - tries some cases, in which it seems to work\n", - "# @TODO\n", - "def build_rection_mnx(model, id,\n", - " reac_str:str = None,\n", - " references:dict={},\n", - " idprefix='refineGEMs',\n", - " namespace:Literal['BiGG']='BiGG') -> cobra.Reaction | None | list:\n", - " \n", - " # ---------------------\n", - " # check, if ID in model\n", - " # ---------------------\n", - " matches_found = [_.id for _ in model.reactions if 'metanetx.reaction' in _.annotation.keys() and _.annotation['metanetx.reaction']==id]\n", - " if len(matches_found) > 0:\n", - " return matches_found\n", - " \n", - " # -----------------------------\n", - " # otherwise, build new reaction\n", - " # -----------------------------\n", - " \n", - " # get relevant part of table from database\n", - " mnx_reac_refs = load_a_table_from_database(\n", - " f'SELECT * FROM mnx_reac_xref WHERE id = \\'{id}\\'',\n", - " query=True)\n", - " mnx_reac_refs = mnx_reac_refs[~(mnx_reac_refs['description']=='secondary/obsolete/fantasy identifier')]\n", - " \n", - " # create reaction object\n", - " new_reac = cobra.Reaction(create_random_id(model,'reac',idprefix))\n", - " \n", - " # set name of reaction\n", - " name = ''\n", - " for desc in mnx_reac_refs['description']:\n", - " if '|' in desc: # entry has a name and an equation string\n", - " name = desc.split('|')[0]\n", - " break # one name is enough\n", - " new_reac.name = name \n", - " \n", - " # get metabolites\n", - " # ---------------\n", - " if not reac_str:\n", - " mnx_reac_prop = load_a_table_from_database(\n", - " f'SELECT * FROM mnx_reac_prop WHERE id = \\'{id}\\'',\n", - " query=True)\n", - " reac_str = mnx_reac_prop['mnx_equation'][0]\n", - " if mnx_reac_prop['ec-code'][0]:\n", - " references['ec-code'] = mnx_reac_prop['ec-code'][0]\n", - " \n", - " if reac_str:\n", - " reactants,products,comparts,rev = parse_reac_str(reac_str,'MetaNetX')\n", - " else:\n", - " return None\n", - " # ............................................................\n", - " # @TODO / Issue\n", - " # reac_prop / mnx equation only saves generic compartments 1 and 2 (MNXD1 / MNXD2)\n", - " # how to get the (correct) compartment?\n", - " # current solution 1 -> c, 2 -> e\n", - " comparts = ['c' if _ == 'MNXD1' else 'e' for _ in comparts ]\n", - " # ............................................................\n", - " metabolites = {}\n", - " meta_counter = 0\n", - " \n", - " # reconstruct reactants\n", - " for mid,factor in reactants.items():\n", - " tmp_meta = build_metabolite_mnx(mid,model,\n", - " namespace,\n", - " comparts[meta_counter],idprefix)\n", - " if tmp_meta:\n", - " metabolites[tmp_meta] = -1*factor\n", - " meta_counter += 1\n", - " else:\n", - " return None # not able to build reaction successfully\n", - " \n", - " # reconstruct products\n", - " for mid,factor in products.items():\n", - " tmp_meta = build_metabolite_mnx(mid,model,\n", - " namespace,\n", - " comparts[meta_counter],idprefix)\n", - " if tmp_meta:\n", - " metabolites[tmp_meta] = factor\n", - " meta_counter += 1\n", - " else:\n", - " return None # not able to build reaction successfully\n", - " \n", - " # add metabolites to reaction\n", - " # @TODO: does it need some kind of try and error, if - for some highly unlikely reason - two newly generated ids are the same\n", - " new_reac.add_metabolites(metabolites)\n", - " \n", - " # set reversibility\n", - " if rev:\n", - " new_reac.bounds = (1000.0,1000.0)\n", - " else:\n", - " new_reac.bounds = (0.0,1000.0)\n", - " \n", - " # get annotations\n", - " # ---------------\n", - " new_reac.annotation['sbo'] = 'SBO:0000167'\n", - " # get more annotation from the mnx_reac_xref table\n", - " for db in ['bigg.reaction','kegg.reaction','seed.reaction','metacyc.reaction','rhea']:\n", - " db_matches = mnx_reac_refs[mnx_reac_refs['source'].str.contains(db)]\n", - " if len(db_matches) > 0:\n", - " new_reac.annotation[db] = [m.split(':',1)[1] for m in db_matches['source'].tolist()]\n", - " # update reactions direction, if MetaCyc has better information\n", - " if db == 'metacyc.reaction' and len(db_matches[db_matches['source'].str.contains('-->')]):\n", - " new_reac.bounds = (0.0,1000.0)\n", - " # add additional references from the parameter\n", - " _add_annotations_from_dict_cobra(references,new_reac)\n", - " \n", - " # get annotations\n", - " # ---------------\n", - " new_reac.annotation['sbo'] = 'SBO:0000167'\n", - "\n", - " # add notes\n", - " # ---------\n", - " new_reac.notes['created with'] = 'refineGEMs GapFiller, MetaNetX'\n", - " \n", - " # match ID to namespace\n", - " # ---------------------\n", - " match_id_to_namespace(new_reac,namespace)\n", - " \n", - " return new_reac\n", - "\n", - "\n", - "# @TEST (more) - tries some cases, in which it seems to work\n", - "# @TODO some things still open (for discussion)\n", - "def build_reaction_kegg(model, id:str=None, reac_str:str=None,\n", - " references:dict={},\n", - " idprefix='refineGEMs',\n", - " namespace:Literal['BiGG']='BiGG'):\n", - " \n", - " # either reaction id or a reaction string needed for reconstruction\n", - " if not id and not reac_str:\n", - " return None # reconstruction not possible\n", - " \n", - " # create an empty reaction with random id\n", - " new_reac = cobra.Reaction(create_random_id(model,'reac',idprefix))\n", - " \n", - " # -------------\n", - " # KEGG ID given\n", - " # -------------\n", - " if id:\n", - " # check, if reaction in model\n", - " matches = [_.id for _ in model.reactions if 'kegg.reaction' in _.annotation.keys() and _.annotation['kegg.reaction']==id]\n", - " if len(matches) > 0:\n", - " return matches # return matched reaction ids in list\n", - " \n", - " # retrieve information from KEGG\n", - " kegg_info = kegg_reaction_parser(id)\n", - " if kegg_info:\n", - " if 'name' in kegg_info.keys():\n", - " new_reac.name = kegg_info['name']\n", - " if 'equation' in kegg_info.keys():\n", - " reac_str = kegg_info['equation'] if not reac_str else reac_str\n", - " if 'db' in kegg_info.keys():\n", - " new_reac.annotation = kegg_info['db']\n", - " \n", - " # -------------------------------------\n", - " # Reaction reconstruction from equation\n", - " # -------------------------------------\n", - " \n", - " # skip, if not reaction string is available\n", - " if not reac_str:\n", - " return None # reconstruction not possible\n", - " \n", - " # parse reaction string\n", - " reactants,products,comparts,rev = parse_reac_str(reac_str,'KEGG')\n", - " \n", - " # ..............................................\n", - " # @TODO\n", - " # KEGG has no information about compartments !!!\n", - " # current solution: always use c\n", - " compartment = 'c'\n", - " # ..............................................\n", - " metabolites = {}\n", - " meta_counter = 0\n", - " \n", - " # reconstruct reactants\n", - " for mid,factor in reactants.items():\n", - " tmp_meta = build_metabolite_kegg(mid,model,\n", - " namespace,\n", - " compartment,idprefix)\n", - " if tmp_meta:\n", - " metabolites[tmp_meta] = -1*factor\n", - " meta_counter += 1\n", - " else:\n", - " return None # not able to build reaction successfully\n", - " \n", - " # reconstruct products\n", - " for mid,factor in products.items():\n", - " tmp_meta = build_metabolite_kegg(mid,model,\n", - " namespace,\n", - " compartment,idprefix)\n", - " if tmp_meta:\n", - " metabolites[tmp_meta] = factor\n", - " meta_counter += 1\n", - " else:\n", - " return None # not able to build reaction successfully\n", - " \n", - " # add metabolites to reaction\n", - " # @TODO: does it need some kind of try and error, if - for some highly unlikely reason - two newly generated ids are the same\n", - " new_reac.add_metabolites(metabolites)\n", - " \n", - " # set reversibility\n", - " if rev:\n", - " new_reac.bounds = (1000.0,1000.0)\n", - " else:\n", - " new_reac.bounds = (0.0,1000.0)\n", - " \n", - " # --------------------\n", - " # add more information\n", - " # --------------------\n", - " new_reac.annotation['sbo'] = 'SBO:0000167'\n", - " # get more information from searching the KEGG ID in BiGG\n", - " bigg_res = load_a_table_from_database(\n", - " f'SELECT * FROM bigg_reactions WHERE \\\"KEGG Reaction\\\" = \\'{id}\\'',\n", - " query=True)\n", - " for idx,row in bigg_res.iterrows():\n", - " r,p,compartments,r = parse_reac_str(row['reaction_string'],'BiGG')\n", - " # .........................................\n", - " # @TODO part 2 of compartment issue\n", - " # find the reaction with 'c' as compartment\n", - " if len(set(compartments)) == 1 and compartments[0] == 'c':\n", - " new_reac.annotation['bigg.reaction'] = row['id']\n", - " # @TODO add more information, exclude None entries\n", - " _add_annotations_from_bigg_reac_row(row, new_reac)\n", - " break\n", - " # .........................................\n", - "\n", - " # @IDEA / @TODO get more information from MetaNetX\n", - " \n", - " # add additional references from the parameter\n", - " _add_annotations_from_dict_cobra(references,new_reac)\n", - " \n", - " # add notes\n", - " # ---------\n", - " new_reac.notes['created with'] = 'refineGEMs GapFiller, KEGG'\n", - " \n", - " # match ID to namespace\n", - " # ---------------------\n", - " match_id_to_namespace(new_reac,namespace)\n", - " \n", - " return new_reac\n", + "from tqdm import tqdm\n", "\n", + "# GapFiller functions\n", + "# -------------------\n", "\n", - "# @TEST\n", - "# @TODO some things still open (for discussion)\n", - "def build_reaction_bigg(model, id, \n", - " reac_str:str = None, \n", - " references:dict={},\n", - " idprefix='refineGEMs',\n", - " namespace:Literal['BiGG']='BiGG'):\n", + "# @TODO BioCyc not implemeneted\n", + "# @TODO logging, save stuff for manual curation etc.\n", + "# @TEST - somewhat seems to work - for now\n", + "def add_reactions_from_table(model,missing_reac_table,\n", + " idprefix='refineGEMs',\n", + " namespace:Literal['BiGG']='BiGG'):\n", " \n", + " # reconstruct reactions\n", " # ---------------------\n", - " # check, if ID in model\n", - " # ---------------------\n", - " matches_found = [_.id for _ in model.reactions if 'bigg.reaction' in _.annotation.keys() and _.annotation['bigg.reaction']==id]\n", - " if len(matches_found) > 0:\n", - " return matches_found\n", - " \n", - " # -----------------------------\n", - " # otherwise, build new reaction\n", - " # -----------------------------\n", - " # create reaction object\n", - " new_reac = cobra.Reaction(create_random_id(model,'reac',idprefix))\n", - " \n", - " # get information from the database\n", - " bigg_reac_info = load_a_table_from_database(\n", - " f'SELECT * FROM bigg_reactions WHERE id = \\'{id}\\'',\n", - " query=True).iloc[0,:]\n", - " new_reac.name = bigg_reac_info['name']\n", - " \n", - " # add metabolites\n", - " # ---------------\n", - " reactants,products,comparts,rev = parse_reac_str(bigg_reac_info['reaction_string'],'BiGG')\n", - " \n", - " metabolites = {}\n", - " meta_counter = 0\n", - " # reconstruct reactants\n", - " for mid,factor in reactants.items():\n", - " tmp_meta = build_metabolite_bigg(mid,model,\n", - " namespace,\n", - " idprefix)\n", - " if tmp_meta:\n", - " metabolites[tmp_meta] = -1*factor\n", - " meta_counter += 1\n", - " else:\n", - " return None # not able to build reaction successfully\n", + " for idx,row in tqdm(missing_reac_table.iterrows(), \n", + " desc='Trying to add missing reacs',\n", + " total=missing_reac_table.shape[0]):\n", + " # build reaction\n", + " reac = None\n", + " match row['via']:\n", + " # MetaNetX\n", + " case 'MetaNetX':\n", + " reac = build_reaction_mnx(model,row['id'],\n", + " reac_str=row['equation'],\n", + " references={'ec-code':[row['ec-code']]},\n", + " idprefix=idprefix,\n", + " namespace=namespace) \n", + " # KEGG\n", + " case 'KEGG':\n", + " refs = row['references']\n", + " refs['ec-code'] = row['ec-code']\n", + " reac = build_reaction_kegg(model,row['id'],\n", + " reac_str=row['equation'],\n", + " references=refs,\n", + " idprefix=idprefix,\n", + " namespace=namespace)\n", + " # BiGG\n", + " case 'BiGG':\n", + " reac = build_reaction_bigg(model,row['id'],\n", + " references={'ec-code':[row['ec-code']]},\n", + " idprefix=idprefix,\n", + " namespace=namespace)\n", + " # BioCyc @TODO\n", + " case 'BioCyc':\n", + " reac = build_reaction_biocyc()\n", + " # Unknown database\n", + " case _:\n", + " mes = f'''Unknown database name for reaction reconstruction: {row[\"via\"]}\\n\n", + " Reaction will not be reconstructed.'''\n", + " warnings.warn(mes,UserWarning)\n", " \n", - " # reconstruct products\n", - " for mid,factor in products.items():\n", - " tmp_meta = build_metabolite_bigg(mid,model,\n", - " namespace,\n", - " idprefix)\n", - " if tmp_meta:\n", - " metabolites[tmp_meta] = factor\n", - " meta_counter += 1\n", + " # check output of reconstruction\n", + " # ------------------------------\n", + " # case 1: reconstruction was not possible\n", + " if not reac:\n", + " pass # nothing to do here\n", + " # case 2: reaction(s) found in model\n", + " elif isinstance(reac,list):\n", + " # add found names to the add_to_GPR column of the table\n", + " current_gpr = missing_reac_table.loc[idx,'add_to_GPR']\n", + " if not current_gpr:\n", + " missing_reac_table.at[idx,'add_to_GPR'] = reac\n", + " else:\n", + " missing_reac_table.at[idx,'add_to_GPR'] = list(set(reac + current_gpr))\n", + " # case 3: new reaction was generated\n", + " elif isinstance(reac,cobra.Reaction):\n", + " # validate reaction\n", + " if isreaction_complete(reac):\n", + " # add reaction to model (if validation succesful)\n", + " model.add_reactions([reac])\n", + " # add reaction ID to table under add_to_GPR\n", + " current_gpr = missing_reac_table.loc[idx,'add_to_GPR']\n", + " if not current_gpr:\n", + " missing_reac_table.at[idx,'add_to_GPR'] = reac.id\n", + " else:\n", + " current_gpr.append(reac.id)\n", + " missing_reac_table.at[idx,'add_to_GPR'] = list(set(current_gpr))\n", + " # case 4: should never occur\n", " else:\n", - " return None # not able to build reaction successfully\n", + " mes = f'Unknown return type for reac param. Please contact the developers.'\n", + " raise TypeError(mes)\n", " \n", - " # add metabolites to reaction\n", - " # @TODO: does it need some kind of try and error, if - for some highly unlikely reason - two newly generated ids are the same\n", - " new_reac.add_metabolites(metabolites)\n", - " \n", - " # set reversibility\n", - " if rev:\n", - " new_reac.bounds = (1000.0,1000.0)\n", - " else:\n", - " new_reac.bounds = (0.0,1000.0)\n", - " \n", - " # add annotations\n", - " # ---------------\n", - " # add SBOTerm\n", - " new_reac.annotation['sbo'] = 'SBO:0000167'\n", - " # add infos from BiGG\n", - " new_reac.annotation['bigg.reaction'] = [id]\n", - " _add_annotations_from_bigg_reac_row(bigg_reac_info, new_reac)\n", - " # add additional references from the parameter\n", - " _add_annotations_from_dict_cobra(references,new_reac)\n", - " \n", - " # add notes\n", - " # ---------\n", - " new_reac.notes['created with'] = 'refineGEMs GapFiller, BiGG'\n", - " \n", - " # match ID to namespace\n", - " # ---------------------\n", - " match_id_to_namespace(new_reac,namespace)\n", + " #@TODO\n", + " # save reactions, that could not be recontructed, for manual curation\n", + " manual_curation_reacs = missing_reac_table[missing_reac_table['add_to_GPR'].isnull()]\n", + " # return the updated table with successfully reconstructed reaction ids \n", + " # to enable adding the genes\n", + " missing_gprs = missing_reac_table[~missing_reac_table['add_to_GPR'].isnull()]\n", + " return missing_gprs\n", "\n", - " return new_reac\n", - " \n", - " \n", - "@implement\n", - "def build_reaction_biocyc():\n", - " pass\n", "\n", - "@implement\n", - "def build_reaction():\n", - " pass\n", "\n", - "# GapFiller functions\n", "# -------------------\n", "\n", - "@implement\n", - "def add_reactions_from_table():\n", - " pass" + "# since adding genes works better with libsbml, \n", + "# return the tabke with the newly added reaction IDs?\n", + "# or still add them here??? -> since this is specific \n", + "# for gapfilling, it does not matter" ] }, { "cell_type": "code", - "execution_count": 128, + "execution_count": 148, "metadata": {}, "outputs": [ + { + "name": "stderr", + "output_type": "stream", + "text": [ + "Trying to add missing reacs: 100%|██████████| 5/5 [00:04<00:00, 1.09it/s]\n" + ] + }, { "data": { "text/html": [ + "
\n", + "\n", + "\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "
ec-codencbiproteinidequationreferenceis_transportviaadd_to_GPR
4862.4.1.-[WP_011274814.1]MNXR1173061 MNXM1033@MNXD1 + 1 MNXM1103285@MNXD1 + 1 MNX...rheaR:31315NoneMetaNetXrefineGEMsreacJ7KDH4
1692.3.1.-[WP_011274812.1, WP_011274824.1]MNXR1123031 MNXM1102789@MNXD1 + 1 MNXM735433@MNXD1 + 1 W...keggR:R08809NoneMetaNetXrefineGEMsreac55L771
4552.4.1.-[WP_011274814.1]MNXR1123111 MNXM1102128@MNXD1 + 1 MNXM6228@MNXD1 = 1 MNX...keggR:R08817NoneMetaNetXrefineGEMsreac5RBIFT
1101.14.99.-[WP_011274813.1]MNXR1801401 MNXM62080@MNXD1 + 1 WATER@MNXD1 = 1 MNXM9572...seedR:rxn32331NoneMetaNetXrefineGEMsreacY72BV4
\n", + "
" ], "text/plain": [ - "" + " ec-code ncbiprotein id \\\n", + "486 2.4.1.- [WP_011274814.1] MNXR117306 \n", + "169 2.3.1.- [WP_011274812.1, WP_011274824.1] MNXR112303 \n", + "455 2.4.1.- [WP_011274814.1] MNXR112311 \n", + "110 1.14.99.- [WP_011274813.1] MNXR180140 \n", + "\n", + " equation reference \\\n", + "486 1 MNXM1033@MNXD1 + 1 MNXM1103285@MNXD1 + 1 MNX... rheaR:31315 \n", + "169 1 MNXM1102789@MNXD1 + 1 MNXM735433@MNXD1 + 1 W... keggR:R08809 \n", + "455 1 MNXM1102128@MNXD1 + 1 MNXM6228@MNXD1 = 1 MNX... keggR:R08817 \n", + "110 1 MNXM62080@MNXD1 + 1 WATER@MNXD1 = 1 MNXM9572... seedR:rxn32331 \n", + "\n", + " is_transport via add_to_GPR \n", + "486 None MetaNetX refineGEMsreacJ7KDH4 \n", + "169 None MetaNetX refineGEMsreac55L771 \n", + "455 None MetaNetX refineGEMsreac5RBIFT \n", + "110 None MetaNetX refineGEMsreacY72BV4 " ] }, - "execution_count": 128, + "execution_count": 148, "metadata": {}, "output_type": "execute_result" } ], "source": [ - "build_reaction_bigg(cobramodel, '13PPDH')" - ] - }, - { - "cell_type": "code", - "execution_count": 114, - "metadata": {}, - "outputs": [ - { - "ename": "KeyError", - "evalue": "'13PPDH'", - "output_type": "error", - "traceback": [ - "\u001b[0;31m---------------------------------------------------------------------------\u001b[0m", - "\u001b[0;31mKeyError\u001b[0m Traceback (most recent call last)", - "Cell \u001b[0;32mIn[114], line 1\u001b[0m\n\u001b[0;32m----> 1\u001b[0m \u001b[43mcobramodel\u001b[49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43mreactions\u001b[49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43mget_by_id\u001b[49m\u001b[43m(\u001b[49m\u001b[38;5;124;43m'\u001b[39;49m\u001b[38;5;124;43m13PPDH\u001b[39;49m\u001b[38;5;124;43m'\u001b[39;49m\u001b[43m)\u001b[49m\n", - "File \u001b[0;32m~/miniconda3/envs/sprg/lib/python3.10/site-packages/cobra/core/dictlist.py:73\u001b[0m, in \u001b[0;36mDictList.get_by_id\u001b[0;34m(self, id)\u001b[0m\n\u001b[1;32m 71\u001b[0m \u001b[38;5;28;01mdef\u001b[39;00m \u001b[38;5;21mget_by_id\u001b[39m(\u001b[38;5;28mself\u001b[39m, \u001b[38;5;28mid\u001b[39m: Union[Object, \u001b[38;5;28mstr\u001b[39m]) \u001b[38;5;241m-\u001b[39m\u001b[38;5;241m>\u001b[39m Object:\n\u001b[1;32m 72\u001b[0m \u001b[38;5;250m \u001b[39m\u001b[38;5;124;03m\"\"\"Return the element with a matching id.\"\"\"\u001b[39;00m\n\u001b[0;32m---> 73\u001b[0m \u001b[38;5;28;01mreturn\u001b[39;00m \u001b[38;5;28mlist\u001b[39m\u001b[38;5;241m.\u001b[39m\u001b[38;5;21m__getitem__\u001b[39m(\u001b[38;5;28mself\u001b[39m, \u001b[38;5;28;43mself\u001b[39;49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43m_dict\u001b[49m\u001b[43m[\u001b[49m\u001b[38;5;28;43mid\u001b[39;49m\u001b[43m]\u001b[49m)\n", - "\u001b[0;31mKeyError\u001b[0m: '13PPDH'" - ] - } - ], - "source": [ - "cobramodel.reactions.get_by_id('13PPDH')" + "testcmod = cobramodel.copy()\n", + "addtestcase = testcase.sample(5)\n", + "add_reactions_from_table(testcmod,addtestcase)" ] }, { diff --git a/src/refinegems/__init__.py b/src/refinegems/__init__.py index cc9dc44..3843875 100644 --- a/src/refinegems/__init__.py +++ b/src/refinegems/__init__.py @@ -1,7 +1,8 @@ -__all__ = ['analysis','classes','curation','utility', 'cmd_access'] +__all__ = ['analysis','classes','curation','decorators','utility', 'cmd_access'] from . import analysis from . import classes from . import curation +from . import decorators from . import utility from . import cmd_access \ No newline at end of file diff --git a/src/refinegems/classes/gapfill.py b/src/refinegems/classes/gapfill.py index be4e742..0ccfec2 100644 --- a/src/refinegems/classes/gapfill.py +++ b/src/refinegems/classes/gapfill.py @@ -10,21 +10,22 @@ from abc import ABC, abstractmethod import cobra -import pandas as pd +import io import numpy as np +import pandas as pd +import re +import warnings +from bioservices.kegg import KEGG +from libsbml import Model as libModel from typing import Literal, Union -from libsbml import Model as libModel -from bioservices.kegg import KEGG -import io -import re from tqdm import tqdm tqdm.pandas() from ..curation.db_access.db import get_ec_from_ncbi, get_ec_via_swissprot from ..utility.io import load_a_table_from_database, parse_fasta_headers, parse_gff_for_cds -from ..utility.entities import get_model_reacs_or_metabs +from ..utility.entities import get_model_reacs_or_metabs, create_gp, create_gpr from ..curation.db_access.kegg import parse_KEGG_gene, parse_KEGG_ec ############################################################################ @@ -145,6 +146,9 @@ def __init__(self) -> None: #@abstractmethod #def load_gene_list(self): # pass + + # abstract methods + # ---------------- @abstractmethod def get_missing_genes(self, model): @@ -153,6 +157,9 @@ def get_missing_genes(self, model): @abstractmethod def get_missing_reacs(self,model): pass + + # finding the gaps + # ---------------- def _find_reac_in_model(self, model: cobra.Model, eccode:str, id:str, idtype:Literal['MetaNetX','KEGG','BiGG', 'BioCyc'], @@ -190,12 +197,88 @@ def _find_reac_in_model(self, model: cobra.Model, eccode:str, id:str, return None - # @abstractmethod - # def run(self,model): - # pass + + # actual "Filling" part + # --------------------- + + # @TODO logging? or remove self? + def add_genes_from_table(self,model:libModel, gene_table:pd.DataFrame) -> None: + """Create new GeneProduct for a table of genes in the format: + + | ncbiprotein | locus_tag | UniProt | ... | + + The dots symbolise additional columns, that can be passed to the function, + but will not be used by it. The other columns, except UniProt, are required. + + Args: + - model (libModel): + _description_ + - gene_table (pd.DataFrame): + The table with the genes to add. At least needs the columns + '','' and 'ec-code'. + """ + + # ncbiprotein | locus_tag | ... + # work on a copy to ensure input stays the same + gene_table = gene_table.copy() + # gene_table.drop(columns=['ec-code'],inplace=True) + + # create gps from the table and add them to the model + if 'UniProt' in gene_table.columns: + for idx,x in gene_table.iterrows(): + create_gp(model, x['ncbiprotein'], + locus_tag=x['locus_tag'], + uniprot=(x['UniProt'],True)) + else: + for idx,x in gene_table.iterrows(): + create_gp(model, x['ncbiprotein'], + locus_tag=x['locus_tag']) + + + # @TODO seems very ridgid, better ways to find the ids? + def add_gene_reac_associations_from_table(model:libModel, + reac_table:pd.DataFrame) -> None: + """Using a table with at least the columns 'ncbiprotein' + (containing e.g. NCBI protein identifier (lists), should be gene IDs in the model) + and 'add_to_GPR' (containing reactions identifier (lists)), add the gene IDs to the + GPRs of the corresponding reactions. + + Args: + - model (libModel): + The model loaded with libSBML. + - reac_table (pd.DataFrame): + The table containing at least the columns 'ncbiprotein' (gene IDs) and + 'add_to_GPR' (reaction IDs) + """ + + model_gene_ids = [_.getId() for _ in model.getPlugin(0).getListOfGeneProducts()] + + # get each unique ncbiprotein vs reaction mapping + reac_table = reac_table[['ncbiprotein','add_to_GPR']] + reac_table = reac_table.explode('ncbiprotein').explode('add_to_GPR') + reac_table.drop_duplicates(inplace=True) + + # add the genes to the corresponding GPRs + for idx,row in reac_table.iterrows(): + # check, if G_+ncbiprotein in model + # if yes, add gpr + geneid = 'G_'+row['ncbiprotein'].replace('.','_') + reacid = 'R_'+row['add_to_GPR'] + if geneid in model_gene_ids: + create_gpr(model.getReaction(reacid),geneid) + # else, print warning + else: + mes = f'Cannot find {geneid} in model. Should be added to {reacid}' + warnings.warn(mes,UserWarning) + + + def fill_model(self, model): pass + + # reporting + # --------- def calculate_stats(self): pass @@ -582,7 +665,11 @@ def get_missing_reacs(self): # def gff_gene_comp(): # pass # -# +# + +# --------------------------------- +# GapFilling with GFF and Swissprot +# --------------------------------- class GeneGapFiller(GapFiller): GFF_COLS = {'locus_tag':'locus_tag', diff --git a/src/refinegems/curation/db_access/db.py b/src/refinegems/curation/db_access/db.py index e817a83..267bb73 100644 --- a/src/refinegems/curation/db_access/db.py +++ b/src/refinegems/curation/db_access/db.py @@ -11,6 +11,7 @@ # requirements ################################################################################ +import cobra import numpy as np import pandas as pd import re @@ -97,6 +98,9 @@ def compare_ids(id1: str, id2: str) -> bool: return similar_ids +# BiGG +# ---- + @sleep_and_retry @limits(calls=10, period=1) def get_reaction_compartment(bigg_id: str) -> str: @@ -377,6 +381,9 @@ def get_reactants_and_products_dicts(reaction_id: str) -> list[dict]: return missing_reacs + # @TODO : name is an issue + + # NCBI parser # ----------- @@ -434,6 +441,7 @@ def filter_DIAMOND_blastp_results(blasttsv:str, pid_theshold:float=90.0): diamond_results = diamond_results[['query_ID','subject_ID']] return diamond_results + # Uniprot # ------- @@ -502,4 +510,110 @@ def get_ec_via_swissprot(fasta:str, db:str, missing_genes:pd.DataFrame, # EC numbers and locus tags mapped_res = mapped_res.groupby(['locus_tag','ec-code']).agg({'UniProt': lambda x: x.tolist()}).reset_index() - return mapped_res \ No newline at end of file + return mapped_res + + +# general +# ------- + +# @TODO: BioCyc missing +def parse_reac_str(equation:str, + type:Literal['BiGG','BioCyc','MetaNetX','KEGG']='MetaNetX') -> tuple[dict,dict,list,bool]: + """Parse a reaction string. + + Args: + - equation (str): + The equation of a reaction as a string (as saved in the database). + - type (Literal['BiGG','BioCyc','MetaNetX','KEGG'], optional): + The name of the database the equation was taken from. + Can be 'BiGG','BioCyc','MetaNetX','KEGG'. + Defaults to 'MetaNetX'. + + Returns: + tuple: + + Tuple of (1) dict, (2) dict, (3) list & (4) bool: + + 1. Dictionary with the reactant IDs and their stoichiometric factors. + 2. Dictionary with the product IDs and their stoichiometric factors. + 3. List of compartment IDs or None, if they cannot be extract from the equation. + 4. True, if the reaction is reversible, else False. + """ + + products = {} + reactants = {} + compartments = list() + is_product = False + reversible = True + + match type: + case 'MetaNetX': + for s in equation.split(' '): + # switch from reactants to products + if s == '=': + is_product = True + # found stoichiometric factor + elif s.isnumeric(): + factor = float(s) + # skip + elif s == '+': + continue + # found metabolite + else: + # get information from MetaNetX + metabolite, compartment = s.split('@') + compartments.append(compartment) + + if is_product: + products[metabolite] = factor + else: + reactants[metabolite] = factor + + case 'BiGG': + factor = 1.0 # BiGG does not use factor 1 in the quations + for s in equation.split(' '): + # found factor + if s.replace('.','').isdigit(): + factor = float(s) + # switch from reactants to products + elif s == '-->' : + is_product = True + reversible = False + elif s == '<->': + is_product = True + # skip + elif s == '+': + continue + # found metabolite + else: + compartments.append(s.rsplit('_',1)[1]) + if is_product: + products[s] = factor + else: + reactants[s] = factor + factor = 1.0 + + case 'KEGG': + compartments = None + factor = 1.0 + for s in equation.split(' '): + if s.isnumeric(): + factor = float(s) + elif s == '+': + continue + elif s == '<=>': # @TODO are there more options? + is_product = True + else: + if is_product: + products[s] = factor + else: + reactants[s] = factor + factor = 1.0 + + case 'BioCyc': + pass + + return (reactants,products,compartments,reversible) + + + \ No newline at end of file diff --git a/src/refinegems/curation/db_access/kegg.py b/src/refinegems/curation/db_access/kegg.py index b7fc738..7f4da7f 100644 --- a/src/refinegems/curation/db_access/kegg.py +++ b/src/refinegems/curation/db_access/kegg.py @@ -10,7 +10,7 @@ Due to the KEGG REST API this is relatively slow (model of size 1500 reactions - 20 min). """ -__author__ = "Famke Baeuerle" +__author__ = "Famke Baeuerle and Carolin Brune" ############################################################################ # requirements diff --git a/src/refinegems/developement/__init__.py b/src/refinegems/developement/__init__.py new file mode 100644 index 0000000..aa17fa2 --- /dev/null +++ b/src/refinegems/developement/__init__.py @@ -0,0 +1,3 @@ +__all__ = ['decorators'] + +from . import decorators \ No newline at end of file diff --git a/src/refinegems/developement/decorators.py b/src/refinegems/developement/decorators.py new file mode 100644 index 0000000..2e98394 --- /dev/null +++ b/src/refinegems/developement/decorators.py @@ -0,0 +1,37 @@ +#!/usr/bin/env python +"""This module contains decorator functions. +""" + +__author__ = "Carolin Brune" + +################################################################################ +# requirements +################################################################################ + +################################################################################ +# variables +################################################################################ + +################################################################################ +# functions +################################################################################ + +def template(func): + """A decorator for template functions. + + Template functions are not executable and are only for developers. + """ + def wrapper(): + raise RuntimeError('This function is a template for developers.\nIt cannot be executed.') + return wrapper + + +def implement(func): + """A decorator for functions, that should be implemented. + + Used to give a hint to other developers, that this function is not yet implemented, + but should be to make the program executable. + """ + def wrapper(): + raise NotImplementedError('The current function is just a placeholder and will be implement in the fucture.') + return wrapper \ No newline at end of file diff --git a/src/refinegems/utility/entities.py b/src/refinegems/utility/entities.py index 8db4cad..9b101a6 100644 --- a/src/refinegems/utility/entities.py +++ b/src/refinegems/utility/entities.py @@ -10,19 +10,28 @@ ################################################################################ import cobra +import json import pandas as pd -from random import choice import re +import requests +import urllib import warnings -from string import ascii_uppercase, digits from Bio import Entrez +from Bio.KEGG import REST, Compound from libsbml import Model as libModel -from libsbml import GeneProduct, Species, Reaction +from libsbml import GeneProduct, Species, Reaction, FbcOr, FbcAnd, GeneProductRef +from random import choice +from string import ascii_uppercase, digits from typing import Union, Literal from .cvterms import add_cv_term_genes, add_cv_term_metabolites, add_cv_term_reactions -from .io import search_ncbi_for_gpr +from .io import search_ncbi_for_gpr, load_a_table_from_database +from ..developement.decorators import * +# @TODO @TEST for import problems +# # -> afraid, this might cause a circular import (due to curation) +from ..curation.db_access.db import parse_reac_str +from ..curation.db_access.kegg import kegg_reaction_parser ################################################################################ # variables @@ -39,8 +48,12 @@ # functions ################################################################################ -# handling compartments - cobra -# ----------------------------- +# ++++++++++++++++++++++++++++++++++++++++ +# COBRApy - models +# ++++++++++++++++++++++++++++++++++++++++ + +# handling compartments +# --------------------- def are_compartment_names_valid(model:cobra.Model) -> bool: """Check if compartment names of model are considered valid based on VALID_COMPARTMENTS. @@ -90,8 +103,8 @@ def resolve_compartment_names(model:cobra.Model): raise KeyError(F'Unknown compartment {[_ for _ in model.compartments if _ not in COMP_MAPPING.keys()]} detected. Cannot resolve problem.') -# handling cobra entities -# ----------------------- +# handling cobra entities (features) +# ---------------------------------- def reaction_equation_to_dict(eq: str, model: cobra.Model) -> dict: """Parses a reaction equation string to dictionary @@ -274,6 +287,1115 @@ def match_id_to_namespace(model_entity:Union[cobra.Reaction, cobra.Metabolite], raise TypeError(mes) +# originally from SPECIMEN HQTB --- changed / extended +# @TODO - are all needed checks covered? any more needed? +def isreaction_complete(reac:cobra.Reaction, + name_check:bool=False, + formula_check:Literal['none','existence','wildcard','strict']='existence', + exclude_dna:bool=True, + exclude_rna:bool=True) -> bool: + + # check reaction features + # ----------------------- + # check name + if name_check: + if reac.id == '' or pd.isnull(reac.name): + return False + # check for RNA/DNA + if exclude_dna and 'DNA' in reac.name: + return False + if exclude_rna and 'RNA' in reac.name: + return False + + # check metabolites features + # -------------------------- + for m in reac.metabolites: + # check id + if m.id == '' or pd.isnull(m.id): + return False + # check name + if name_check and (m.name == '' or pd.isnull(m.name)): + return False + # check formula + match formula_check: + # case 1: no formula checking + case 'none': + pass + # case 2: check, if formula is set + case 'existence': + if m.formula == '' or pd.isnull(m.formula): + return False + # case 3: check, if formula is set and only contains alphanumerical characters + case 'wildcard': + if m.formula == '' or pd.isnull(m.formula) or not m.formula.isalnum(): + return False + # case 4: check, if formula is set, constains only alphanumerical characters and no rest R + case 'strict': + if m.formula == '' or pd.isnull(m.formula) or not m.formula.isalnum() or bool(re.search(r'R(?![a-z])', 'CH2HOROH')): + return False + # default case: Warning + no formula check + case _: + mes = f'Unknown options for formula_check: {formula_check}\nChecking the metabolite formula will be skipped.' + warnings.warn(mes,UserWarning) + + + return True + +# extending annotations +# --------------------- +# @TODO is this a good place for these functions or maybe somewhere else? + +# @TODO name of this function is not good +def get_BiGG_metabs_annotation_via_dbid(metabolite:cobra.Metabolite, + id:str, dbcol:str, + compartment:str = 'c') -> None: + """Search for a BiGG ID and add it to a metabolite annotation. + The search is based on a column name of the BiGG metabolite table + and an ID to search for. Additionally, using the given compartment name, + the found IDs are filtered for matching compartments. + + Args: + - metabolite (cobra.Metabolite): + The metabolite. Needs to a a COBRApy Metabolte object. + - id (str): + The ID to search for in the database. + - dbcol (str): + Name of the column of the database to check the ID against. + - compartment (str, optional): + The compartment name. Needs to be a valid BiGG compartment ID. + Defaults to 'c'. + """ + # check, if a BiGG annotation is given + if not 'bigg.metabolite' in metabolite.annotation.keys(): + # seach the database for the found id + bigg_search = load_a_table_from_database( + f'SELECT * FROM bigg_metabolites WHERE \'{dbcol}\' = \'{id}\'', + query=True) + if len(bigg_search) > 0: + # check, if the matches also match the compartment + metabolite.annotation['bigg.metabolite'] = [_ for _ in bigg_search['id'].tolist() if _.endswith(f'_{compartment}')] + # add final matches to the annotations of the metabolite + if len(metabolite.annotation['bigg.metabolite']) == 0: + metabolite.annotation.pop('bigg.metabolite') + + +def add_annotations_from_BiGG_metabs(metabolite:cobra.Metabolite) -> None: + """Check a cobra.metabolite for bigg.metabolite annotations. If they exists, + search for more annotations in the BiGG database and add them to the metabolite. + + Args: + - metabolite (cobra.Metabolite): + The metabolite object. + """ + if 'bigg.metabolite' in metabolite.annotation.keys(): + bigg_information = load_a_table_from_database( + 'SELECT * FROM bigg_metabolites WHERE id = \'' + f'\' OR id = \''.join(metabolite.annotation['bigg.metabolite']) + '\'', + query=True) + db_id_bigg = {'BioCyc':'biocyc', 'MetaNetX (MNX) Chemical':'metanetx.chemical','SEED Compound':'seed.compound','CHEBI':'chebi', 'KEGG Compound':'kegg.compound'} + for db in db_id_bigg: + info = list(set(bigg_information[db].dropna().to_list())) + if len(info) > 0: + info = ','.join(info) + info = [x.strip() for x in info.split(',')] # make sure all entries are a separate list object + if db_id_bigg[db] in metabolite.annotation.keys(): + metabolite.annotation[db_id_bigg[db]] = list(set(info + metabolite.annotation[db_id_bigg[db]])) + else: + metabolite.annotation[db_id_bigg[db]] = info + + +def _add_annotations_from_bigg_reac_row(row:pd.Series, reac:cobra.Reaction) -> None: + """Given a row of the BiGG reaction database table and a cobra.Reaction object, + extend the annotation of the latter with the information of the former. + + Args: + - row (pd.Series): + The row of the database table. + - reac (cobra.Reaction): + The reaction object. + """ + + dbnames = {'RHEA':'rhea','BioCyc':'biocyc','MetaNetX (MNX) Equation':'metanetx.reaction','EC Number':'ec-code'} + for dbname,dbprefix in dbnames.items(): + if row[dbname]: + ids_to_add = row[dbname].split(',') + if dbprefix in reac.annotation.keys(): + reac.annotation[dbprefix] = list(set(reac.annotation[dbprefix]).union(set(ids_to_add))) + else: + reac.annotation[dbprefix] = ids_to_add + + +def _add_annotations_from_dict_cobra(references:dict, entity:cobra.Reaction|cobra.Metabolite|cobra.Model) -> None: + """Given a dictionary and a cobra object, add the former as annotations to the latter. + The keys of the dictionary are used as the annotation labels, the values as the values. + If the keys are already in the entity, the values will be combined (union). + + Args: + - references (dict): + The dictionary with the references to add the entity. + - entity (cobra.Reaction | cobra.Metabolite | cobra.Model): + The entity to add annotations to. + """ + # add additional references from the parameter + for db,idlist in references.items(): + if not isinstance(idlist,list): + idlist = [idlist] + if db in entity.annotation.keys(): + entity.annotation[db] = list(set(entity.annotation[db] + idlist)) + else: + entity.annotation[db] = idlist + + +# adding metabolites to cobra models +# ---------------------------------- + +@template +def build_metabolite_xxx(id:str, model:cobra.Model, + namespace:str, + compartment:str, + idprefix:str) -> cobra.Metabolite: + """Template function for building a cobra.Metabolite. + + .. note:: + + This is a template function for developers. It is cannot be executed. + + Args: + - id (str): + _description_ + - model (cobra.Model): + _description_ + - namespace (str): + _description_ + - compartment (str): + _description_ + - idprefix (str): + _description_ + + Returns: + cobra.Metabolite: + _description_ + """ + # change xxx to database or way the metabolite will be reconstructed with + # check if id in model + # get information via id + # collection formation in a new metabolite object + # add more annotations from other databases + # adjust namespace + # check model again for new namespace + pass + + +# originally from SPECIMEN +# @TODO some issues left +# current version works on a couple of examples +def build_metabolite_mnx(id: str, model:cobra.Model, + namespace:str='BiGG', + compartment:str='c', + idprefix:str='refineGEMs') -> cobra.Metabolite | None: + """Build a cobra.Metabolite object from a MetaNetX ID. + This function will NOT directly add the metabolite to the model, + if the contruction is successful. + + Args: + - id (str): + A MetaNetX ID of a metabolite. + - model (cobra.Model): + The model, the metabolite will be build for. + - namespace (str, optional): + Name to use for the model ID. If namespace cannot be matched, + will use a random ID. + Defaults to 'BiGG'. + - compartment (str, optional): + Compartment of the metabolite. Defaults to 'c'. + - idprefix (str, optional): + Prefix for the random ID. Defaults to 'refineGEMs'. + + Returns: + Case construction successful or match found in model: + + cobra.Metabolite: + The metabolite object. + + Case construction failed: + + None: + Nothing to return. + """ + + # fast check if compound already in model + # ------------------------------------------ + # step 1: check if MetaNetX ID in model + matches = [x.id for x in model.metabolites if 'metanetx.chemical' in x.annotation and x.annotation['metanetx.chemical']==id and x.compartment == compartment] + + # step 2: if yes, retrieve metabolite from model + # case 1: multiple matches found + if len(matches) > 0: + if len(matches) > 1: + # ................ + # @TODO what to do + # currently, just the first one is taken + # ................ + match = model.metabolites.get_by_id(matches[0]) + # case 2: only one match found + else: + match = model.metabolites.get_by_id(matches[0]) + + # step 3: add metabolite + return match + + # if not, create new metabolite + # ----------------------------- + metabolite_prop = load_a_table_from_database(f'SELECT * FROM mnx_chem_prop WHERE id = \'{id}\'') + metabolite_anno = load_a_table_from_database(f'SELECT * FROM mnx_chem_xref WHERE id = \'{id}\'') + if len(metabolite_prop) == 0: # cannot construct metabolite + return None + else: + + # step 1: create a random metabolite ID + new_metabolite = cobra.Metabolite(create_random_id(model, 'meta', idprefix)) + + # step 2: add features + # -------------------- + new_metabolite.formula = metabolite_prop['formula'].iloc[0] + new_metabolite.name = metabolite_prop['name'].iloc[0] + new_metabolite.charge = metabolite_prop['charge'].iloc[0] + new_metabolite.compartment = compartment + + # step 3: add notes + # ----------------- + new_metabolite.notes['created with'] = 'refineGEMs GapFiller, metanetx.chemical' + + # step 4: add annotations + # ----------------------- + # add SBOTerm + new_metabolite.annotation['sbo'] = 'SBO:0000247' + + # add information directly available from the mnx_chem_prop table + new_metabolite.annotation['metanetx.chemical'] = [metabolite_prop['id'].iloc[0]] + if not pd.isnull(metabolite_prop['InChIKey'].iloc[0]): + new_metabolite.annotation['inchikey'] = metabolite_prop['InChIKey'].iloc[0].split('=')[1] + + # get more annotation from the mnx_chem_xref table + for db in ['kegg.compound','metacyc.compound','seed.compound','bigg.metabolite','chebi']: + db_matches = metabolite_anno[metabolite_anno['source'].str.contains(db)] + if len(db_matches) > 0: + new_metabolite.annotation[db] = [m.split(':',1)[1] for m in db_matches['source'].tolist()] + + # Cleanup BiGG annotations (MetaNetX only saves universal) + # @TODO : there is no guarantee, that the id with the specific compartment actually exists -> still do it? // kepp the universal id? + if 'bigg.metabolite' in new_metabolite.annotation.keys(): + new_metabolite.annotation['bigg.metabolite'] = [_+'_'+compartment for _ in new_metabolite.annotation['bigg.metabolite']] + else: + # if no BiGG was found in MetaNetX, try reverse search in BiGG + get_BiGG_metabs_annotation_via_dbid(new_metabolite, id, 'MetaNetX (MNX) Chemical', compartment) + + # add additional information from BiGG (if ID found) + add_annotations_from_BiGG_metabs(new_metabolite) + + # step 5: change ID according to namespace + # ---------------------------------------- + match_id_to_namespace(new_metabolite,namespace) + + # step 6: re-check existence of ID in model + # ----------------------------------------- + # @TODO : check complete annotations? + # - or let those be covered by the duplicate check later on? + if new_metabolite.id in [_.id for _ in model.metabolites]: + return model.metabolites.get_by_id(new_metabolite.id) + + return new_metabolite + + +# originally from SPECIMEN +# @TODO some issues left +# current version works on a couple of examples +def build_metabolite_kegg(kegg_id:str, model:cobra.Model, + namespace:Literal['BiGG']='BiGG', + compartment:str='c', + idprefix='refineGEMs') -> cobra.Metabolite | None: + """Build a cobra.Metabolite object from a KEGG ID. + This function will NOT directly add the metabolite to the model, + if the contruction is successful. + + Args: + - kegg_id (str): + A KEGG ID of a metabolite. + - model (cobra.Model): + The model, the metabolite will be build for. + - namespace (str, optional): + Name to use for the model ID. If namespace cannot be matched, + will use a random ID. + Defaults to 'BiGG'. + - compartment (str, optional): + Compartment of the metabolite. Defaults to 'c'. + - idprefix (str, optional): + Prefix for the random ID. Defaults to 'refineGEMs'. + + Returns: + Case construction successful or match found in model: + + cobra.Metabolite: + The build metabolite object. + + Case construction failed: + + None: + Nothing to return. + """ + + # --------------------------------------- + # fast check if compound already in model + # --------------------------------------- + # step 1: check via KEGG ID + matches = [x.id for x in model.metabolites if ('kegg.compound' in x.annotation and x.annotation['kegg.compound'] == kegg_id)] + if len(matches) > 0: + # step 2: model id --> metabolite object + # case 1: multiple matches found + if len(matches) > 1: + # ....... + # @TODO + # ....... + match = model.metabolites.get_by_id(matches[0]) + # case 2: only one match found + else: + match = model.metabolites.get_by_id(matches[0]) + + # step 3: add metabolite + return match + + # ----------------------------- + # if not, create new metabolite + # ----------------------------- + + # step 1: retrieve KEGG entry for compound + # ---------------------------------------- + try: + kegg_handle = REST.kegg_get(kegg_id) + kegg_record = [r for r in Compound.parse(kegg_handle)][0] + except urllib.error.HTTPError: + warnings.warn(F'HTTPError: {kegg_id}') + return None + except ConnectionResetError: + warnings.warn(F'ConnectionResetError: {kegg_id}') + return None + except urllib.error.URLError: + warnings.warn(F'URLError: {kegg_id}') + return None + + # step 2: create a random metabolite ID + # ------------------------------------- + new_metabolite = cobra.Metabolite(create_random_id(model, 'meta',idprefix)) + + # step 3: add features + # -------------------- + # set name from KEGG and additionally use it as ID if there is none yet + if isinstance(kegg_record.name, list): + # @TODO : better way to choose a name than to just take the first entry??? + new_metabolite.name = kegg_record.name[0] + else: + new_metabolite.name = kegg_record.name + # set compartment + new_metabolite.compartment = compartment + # set formula + new_metabolite.formula = kegg_record.formula + + # step 4: add notes + # ----------------- + new_metabolite.notes['created with'] = 'refineGEMs GapFiller, KEGG.compound' + + # step 5: add annotations + # ----------------------- + # add annotation from the KEGG entry + new_metabolite.annotation['kegg.compound'] = kegg_id + db_idtf = {'CAS':'cas','PubChem':'pubchem.compound','ChEBI':'chebi'} + for db,ids in kegg_record.dblinks: + if db in db_idtf: + new_metabolite.annotation[db_idtf[db]] = ids + + # add SBOTerm + new_metabolite.annotation['sbo'] = 'SBO:0000247' + + # search for infos in MetaNetX + # @TODO, since the table are readily available at the database now + mnx_info = load_a_table_from_database( + f'SELECT * FROM mnx_chem_xref WHERE source = \'kegg.compound:{kegg_id}\'', + query=True + ) + if len(mnx_info) > 0: + mnx_ids = list(set(mnx_info['id'])) + # mapping is unambiguously + if len(mnx_ids) == 1: + mnx_info = load_a_table_from_database( + f'SELECT * FROM mnx_chem_prop WHERE id = \'{mnx_ids[0]}\'', + query=True + ) + # add charge + new_metabolite.charge = mnx_info['charge'].iloc[0] + # add more annotations + new_metabolite.annotation['metanetx.chemical'] = [mnx_info['id'].iloc[0]] + if not pd.isnull(mnx_info['InChIKey'].iloc[0]): + new_metabolite.annotation['inchikey'] = mnx_info['InChIKey'].iloc[0].split('=')[1] + + # get more annotation from the mnx_chem_xref table + metabolite_anno = load_a_table_from_database(f'SELECT * FROM mnx_chem_xref WHERE id = \'{mnx_info["id"]}\'') + for db in ['kegg.compound','metacyc.compound','seed.compound','bigg.metabolite','chebi']: + db_matches = metabolite_anno[metabolite_anno['source'].str.contains(db)] + if len(db_matches) > 0: + mnx_tmp = [m.split(':',1)[1] for m in db_matches['source'].tolist()] + if db in new_metabolite.annotation.keys(): + new_metabolite.annotation[db] = list(set(mnx_tmp + new_metabolite.annotation[db])) + else: + new_metabolite.annotation[db] = mnx_tmp + + else: + pass + # @TODO : how to handle multiple matches, e.g. getting charge will be complicated + + # Cleanup BiGG annotations (MetaNetX only saves universal) + # @TODO : there is no guarantee, that the id with the specific compartment actually exists -> still do it? // kepp the universal id? + if 'bigg.metabolite' in new_metabolite.annotation.keys(): + new_metabolite.annotation['bigg.metabolite'] = [_+'_'+compartment for _ in new_metabolite.annotation['bigg.metabolite']] + + # if no BiGG ID, try reverse search + get_BiGG_metabs_annotation_via_dbid(new_metabolite, id, 'KEGG Compound', compartment) + + # search for annotations in BiGG + add_annotations_from_BiGG_metabs(new_metabolite) + + # step 6: change ID according to namespace + # ---------------------------------------- + match_id_to_namespace(new_metabolite,namespace) + + # step 7: re-check existence of ID in model + # ----------------------------------------- + # @TODO : check complete annotations? + # - or let those be covered by the duplicate check later on? + if new_metabolite.id in [_.id for _ in model.metabolites]: + return model.metabolites.get_by_id(new_metabolite.id) + + return new_metabolite + + +# @TEST some more, somewhat works, but who knows... +# @TODO some comments inside +# @NOTE expects the non-universal BiGG ID (meaning the one with the compartment abbreviation +# at the end) -> change behaviour or keep it? +def build_metabolite_bigg(id:str, model:cobra.Model, + namespace:Literal['BiGG']='BiGG', + idprefix:str='refineGEMs') -> cobra.Metabolite | None: + """Build a cobra.Metabolite object from a BiGG ID. + This function will NOT directly add the metabolite to the model, + if the contruction is successful. + + Args: + - id (str): + A BiGG ID of a metabolite. + - model (cobra.Model): + The model, the metabolite will be build for. + - namespace (str, optional): + Name to use for the model ID. If namespace cannot be matched, + will use a random ID. + Defaults to 'BiGG'. + - compartment (str, optional): + Compartment of the metabolite. Defaults to 'c'. + - idprefix (str, optional): + Prefix for the random ID. Defaults to 'refineGEMs'. + + Returns: + Case construction successful or match found in model: + + cobra.Metabolite: + The build metabolite object. + + Case construction failed: + + None: + Nothing to return. + """ + + + compartment = id.rsplit('_',1)[1] + # ------------------------------------------ + # fast check if compound already in model + # ------------------------------------------ + # step 1: check if MetaNetX ID in model + matches = [x.id for x in model.metabolites if 'bigg.metabolite' in x.annotation and (x.annotation['bigg.metabolite']==id or x.annotation['bigg.metabolite']==id.rsplit('_',1)[0]) and x.compartment == compartment] + # step 2: if yes, retrieve metabolite from model + # case 1: multiple matches found + if len(matches) > 0: + if len(matches) > 1: + # ................ + # @TODO what to do + # currently, just the first one is taken + # ................ + match = model.metabolites.get_by_id(matches[0]) + # case 2: only one match found + else: + match = model.metabolites.get_by_id(matches[0]) + + # step 3: add metabolite + return match + + # ----------------------------- + # if not, create new metabolite + # ----------------------------- + # get information from the database + bigg_res = load_a_table_from_database( + f'SELECT * FROM bigg_metabolites WHERE id = \'{id}\'', + query=True) + if len(bigg_res) > 0: + bigg_res = bigg_res.iloc[0,:] + else: + return None # not data = no recontruction + # get information from MNX if ID available + mnx_res=None + if bigg_res['MetaNetX (MNX) Chemical']: + mnx_res = load_a_table_from_database( + f'SELECT * FROM mnx_chem_prop WHERE id=\'{bigg_res["MetaNetX (MNX) Chemical"]}\'', + query=True) + if len(mnx_res) > 0: + mnx_res = mnx_res.iloc[0,:] + else: + mnx_res=None + + # step 1: create a random metabolite ID + # ------------------------------------- + new_metabolite = cobra.Metabolite(create_random_id(model, 'meta', idprefix)) + + # step 2: add features + # -------------------- + new_metabolite.name = bigg_res['name'] + new_metabolite.compartment = compartment + if mnx_res is not None: + if mnx_res['charge']: + new_metabolite.charge = mnx_res['charge'] + if mnx_res['formula']: + new_metabolite.formula = mnx_res['formula'] + + if not new_metabolite.formula or not new_metabolite.charge: + try: + bigg_fetch = json.loads(requests.get(f'http://bigg.ucsd.edu/api/v2/universal/metabolites/{id.rsplit("_",1)[0]}').text) + if 'formulae' in bigg_fetch.keys() and not new_metabolite.formula: + new_metabolite.formula = bigg_fetch['formulae'][0] + if 'charges' in bigg_fetch.keys() and new_metabolite.charge: + new_metabolite.charge = bigg_fetch['charges'][0] + except Exception as e: + # @TODO + pass + + # step 3: add notes + # ----------------- + new_metabolite.notes['created with'] = 'refineGEMs GapFiller, BiGG' + + # step 4: add annotations + # ----------------------- + # add SBOTerm + new_metabolite.annotation['sbo'] = 'SBO:0000247' + # add infos from BiGG + new_metabolite.annotation['bigg.metabolite'] = [id] # @TODO or use the universal id? + add_annotations_from_BiGG_metabs(new_metabolite) + # add annotations from MNX + if mnx_res is not None: + if mnx_res['InChI'] and 'inchi' not in new_metabolite.annotation.keys(): + new_metabolite.annotation['inchi'] = mnx_res['InChI'] + if mnx_res['InChIKey'] and 'inchikey' not in new_metabolite.annotation.keys(): + new_metabolite.annotation['inchikey'] = mnx_res['InChIKey'] + + # step 5: change ID according to namespace + # ---------------------------------------- + match_id_to_namespace(new_metabolite,namespace) + + # step 6: re-check existence of ID in model + # ----------------------------------------- + # @TODO : check complete annotations? + # - or let those be covered by the duplicate check later on? + if new_metabolite.id in [_.id for _ in model.metabolites]: + return model.metabolites.get_by_id(new_metabolite.id) + + return new_metabolite + + +@implement +def build_metabolite_biocyc(id:str, model:cobra.Model, + namespace:str, + compartment:str, + idprefix:str) -> cobra.Metabolite: + pass + + +# adding reactions cobra models +# ----------------------------- + +# TODO +# extend the build function so, that all of them can take either the id or an equation +# as input for rebuilding the reaction (would also be beneficial for semi-manual curation) + +@template +# @TODO complete it +def build_reaction_xxx(): + pass + + +# @TEST (more) - tries some cases, in which it seems to work +# @TODO +def build_reaction_mnx(model:cobra.Model, id:str, + reac_str:str = None, + references:dict={}, + idprefix='refineGEMs', + namespace:Literal['BiGG']='BiGG') -> cobra.Reaction | None | list: + """Construct a new reaction for a model from a MetaNetX reaction ID. + This function will NOT add the reaction directly to the model, if the + construction process is successful. + + Args: + - model (cobra.Model): + The model loaded with COBRApy. + - id (str): + A MetaNetX reaction ID. + - reac_str (str, optional): + The reaction equation string from the database. + Defaults to None. + - references (dict, optional): + Additional annotations to add to the reaction (idtype:[value]). + Defaults to {}. + - idprefix (str, optional): + Prefix for the pseudo-identifier. Defaults to 'refineGEMs'. + - namespace (Literal['BiGG'], optional): + Namespace to use for the reaction ID. + If namespace cannot be matched, uses the pseudo-ID + Defaults to 'BiGG'. + + Returns: + Case successful construction: + + cobra.Reaction: + The newly build reaction object. + + Case construction not possible: + + None: + Nothing to return. + + Case reaction found in model. + + list: + List of matching reaction IDs (in model). + """ + + # --------------------- + # check, if ID in model + # --------------------- + matches_found = [_.id for _ in model.reactions if 'metanetx.reaction' in _.annotation.keys() and _.annotation['metanetx.reaction']==id] + if len(matches_found) > 0: + return matches_found + + # ----------------------------- + # otherwise, build new reaction + # ----------------------------- + + # get relevant part of table from database + mnx_reac_refs = load_a_table_from_database( + f'SELECT * FROM mnx_reac_xref WHERE id = \'{id}\'', + query=True) + mnx_reac_refs = mnx_reac_refs[~(mnx_reac_refs['description']=='secondary/obsolete/fantasy identifier')] + + # create reaction object + new_reac = cobra.Reaction(create_random_id(model,'reac',idprefix)) + + # set name of reaction + name = '' + for desc in mnx_reac_refs['description']: + if '|' in desc: # entry has a name and an equation string + name = desc.split('|')[0] + break # one name is enough + new_reac.name = name + + # get metabolites + # --------------- + if not reac_str: + mnx_reac_prop = load_a_table_from_database( + f'SELECT * FROM mnx_reac_prop WHERE id = \'{id}\'', + query=True) + reac_str = mnx_reac_prop['mnx_equation'][0] + if mnx_reac_prop['ec-code'][0]: + references['ec-code'] = mnx_reac_prop['ec-code'][0] + + if reac_str: + reactants,products,comparts,rev = parse_reac_str(reac_str,'MetaNetX') + else: + return None + # ............................................................ + # @TODO / Issue + # reac_prop / mnx equation only saves generic compartments 1 and 2 (MNXD1 / MNXD2) + # how to get the (correct) compartment? + # current solution 1 -> c, 2 -> e + comparts = ['c' if _ == 'MNXD1' else 'e' for _ in comparts ] + # ............................................................ + metabolites = {} + meta_counter = 0 + + # reconstruct reactants + for mid,factor in reactants.items(): + tmp_meta = build_metabolite_mnx(mid,model, + namespace, + comparts[meta_counter],idprefix) + if tmp_meta: + metabolites[tmp_meta] = -1*factor + meta_counter += 1 + else: + return None # not able to build reaction successfully + + # reconstruct products + for mid,factor in products.items(): + tmp_meta = build_metabolite_mnx(mid,model, + namespace, + comparts[meta_counter],idprefix) + if tmp_meta: + metabolites[tmp_meta] = factor + meta_counter += 1 + else: + return None # not able to build reaction successfully + + # add metabolites to reaction + # @TODO: does it need some kind of try and error, if - for some highly unlikely reason - two newly generated ids are the same + new_reac.add_metabolites(metabolites) + + # set reversibility + if rev: + new_reac.bounds = (1000.0,1000.0) + else: + new_reac.bounds = (0.0,1000.0) + + # get annotations + # --------------- + new_reac.annotation['sbo'] = 'SBO:0000167' + # get more annotation from the mnx_reac_xref table + for db in ['bigg.reaction','kegg.reaction','seed.reaction','metacyc.reaction','rhea']: + db_matches = mnx_reac_refs[mnx_reac_refs['source'].str.contains(db)] + if len(db_matches) > 0: + new_reac.annotation[db] = [m.split(':',1)[1] for m in db_matches['source'].tolist()] + # update reactions direction, if MetaCyc has better information + if db == 'metacyc.reaction' and len(db_matches[db_matches['source'].str.contains('-->')]): + new_reac.bounds = (0.0,1000.0) + # add additional references from the parameter + _add_annotations_from_dict_cobra(references,new_reac) + + # get annotations + # --------------- + new_reac.annotation['sbo'] = 'SBO:0000167' + + # add notes + # --------- + new_reac.notes['created with'] = 'refineGEMs GapFiller, MetaNetX' + + # match ID to namespace + # --------------------- + match_id_to_namespace(new_reac,namespace) + + return new_reac + + +# @TEST (more) - tries some cases, in which it seems to work +# @TODO some things still open (for discussion) +def build_reaction_kegg(model:cobra.Model, id:str=None, reac_str:str=None, + references:dict={}, + idprefix='refineGEMs', + namespace:Literal['BiGG']='BiGG') -> None: + """Construct a new reaction for a model from either a KEGG reaction ID + or a KEGG equation string. + This function will NOT add the reaction directly to the model, if the + construction process is successful. + + Args: + - model (cobra.Model): + The model loaded with COBRApy. + - id (str,optional): + A KEGG reaction ID. + - reac_str (str, optional): + The reaction equation string from the database. + Defaults to None. + - references (dict, optional): + Additional annotations to add to the reaction (idtype:[value]). + Defaults to {}. + - idprefix (str, optional): + Prefix for the pseudo-identifier. Defaults to 'refineGEMs'. + - namespace (Literal['BiGG'], optional): + Namespace to use for the reaction ID. + If namespace cannot be matched, uses the pseudo-ID + Defaults to 'BiGG'. + + Returns: + Case successful construction: + + cobra.Reaction: + The newly build reaction object. + + Case construction not possible: + + None: + Nothing to return. + + Case reaction found in model. + + list: + List of matching reaction IDs (in model). + """ + + # either reaction id or a reaction string needed for reconstruction + if not id and not reac_str: + return None # reconstruction not possible + + # create an empty reaction with random id + new_reac = cobra.Reaction(create_random_id(model,'reac',idprefix)) + + # ------------- + # KEGG ID given + # ------------- + if id: + # check, if reaction in model + matches = [_.id for _ in model.reactions if 'kegg.reaction' in _.annotation.keys() and _.annotation['kegg.reaction']==id] + if len(matches) > 0: + return matches # return matched reaction ids in list + + # retrieve information from KEGG + kegg_info = kegg_reaction_parser(id) + if kegg_info: + if 'name' in kegg_info.keys(): + new_reac.name = kegg_info['name'] + if 'equation' in kegg_info.keys(): + reac_str = kegg_info['equation'] if not reac_str else reac_str + if 'db' in kegg_info.keys(): + new_reac.annotation = kegg_info['db'] + + # ------------------------------------- + # Reaction reconstruction from equation + # ------------------------------------- + + # skip, if not reaction string is available + if not reac_str: + return None # reconstruction not possible + + # parse reaction string + reactants,products,comparts,rev = parse_reac_str(reac_str,'KEGG') + + # .............................................. + # @TODO + # KEGG has no information about compartments !!! + # current solution: always use c + compartment = 'c' + # .............................................. + metabolites = {} + meta_counter = 0 + + # reconstruct reactants + for mid,factor in reactants.items(): + tmp_meta = build_metabolite_kegg(mid,model, + namespace, + compartment,idprefix) + if tmp_meta: + metabolites[tmp_meta] = -1*factor + meta_counter += 1 + else: + return None # not able to build reaction successfully + + # reconstruct products + for mid,factor in products.items(): + tmp_meta = build_metabolite_kegg(mid,model, + namespace, + compartment,idprefix) + if tmp_meta: + metabolites[tmp_meta] = factor + meta_counter += 1 + else: + return None # not able to build reaction successfully + + # add metabolites to reaction + # @TODO: does it need some kind of try and error, if - for some highly unlikely reason - two newly generated ids are the same + new_reac.add_metabolites(metabolites) + + # set reversibility + if rev: + new_reac.bounds = (1000.0,1000.0) + else: + new_reac.bounds = (0.0,1000.0) + + # -------------------- + # add more information + # -------------------- + new_reac.annotation['sbo'] = 'SBO:0000167' + # get more information from searching the KEGG ID in BiGG + bigg_res = load_a_table_from_database( + f'SELECT * FROM bigg_reactions WHERE \"KEGG Reaction\" = \'{id}\'', + query=True) + for idx,row in bigg_res.iterrows(): + r,p,compartments,r = parse_reac_str(row['reaction_string'],'BiGG') + # ......................................... + # @TODO part 2 of compartment issue + # find the reaction with 'c' as compartment + if len(set(compartments)) == 1 and compartments[0] == 'c': + new_reac.annotation['bigg.reaction'] = row['id'] + # @TODO add more information, exclude None entries + _add_annotations_from_bigg_reac_row(row, new_reac) + break + # ......................................... + + # @IDEA / @TODO get more information from MetaNetX + + # add additional references from the parameter + _add_annotations_from_dict_cobra(references,new_reac) + + # add notes + # --------- + new_reac.notes['created with'] = 'refineGEMs GapFiller, KEGG' + + # match ID to namespace + # --------------------- + match_id_to_namespace(new_reac,namespace) + + return new_reac + + +# @TEST +# @TODO some things still open (for discussion) +def build_reaction_bigg(model:cobra.Model, id:str, + reac_str:str = None, + references:dict={}, + idprefix='refineGEMs', + namespace:Literal['BiGG']='BiGG'): + """Construct a new reaction for a model from a BiGG reaction ID. + This function will NOT add the reaction directly to the model, if the + construction process is successful. + + Args: + - model (cobra.Model): + The model loaded with COBRApy. + - id (str): + A BiGG reaction ID. + - reac_str (str, optional): + The reaction equation string from the database. + Currently, this param is not doing anything in this function @TODO. + Defaults to None. + - references (dict, optional): + Additional annotations to add to the reaction (idtype:[value]). + Defaults to {}. + - idprefix (str, optional): + Prefix for the pseudo-identifier. Defaults to 'refineGEMs'. + - namespace (Literal['BiGG'], optional): + Namespace to use for the reaction ID. + If namespace cannot be matched, uses the pseudo-ID + Defaults to 'BiGG'. + + Returns: + Case successful construction: + + cobra.Reaction: + The newly build reaction object. + + Case construction not possible: + + None: + Nothing to return. + + Case reaction found in model. + + list: + List of matching reaction IDs (in model). + """ + + # --------------------- + # check, if ID in model + # --------------------- + matches_found = [_.id for _ in model.reactions if 'bigg.reaction' in _.annotation.keys() and _.annotation['bigg.reaction']==id] + if len(matches_found) > 0: + return matches_found + + # ----------------------------- + # otherwise, build new reaction + # ----------------------------- + # create reaction object + new_reac = cobra.Reaction(create_random_id(model,'reac',idprefix)) + + # get information from the database + bigg_reac_info = load_a_table_from_database( + f'SELECT * FROM bigg_reactions WHERE id = \'{id}\'', + query=True).iloc[0,:] + new_reac.name = bigg_reac_info['name'] + + # add metabolites + # --------------- + reactants,products,comparts,rev = parse_reac_str(bigg_reac_info['reaction_string'],'BiGG') + + metabolites = {} + meta_counter = 0 + # reconstruct reactants + for mid,factor in reactants.items(): + tmp_meta = build_metabolite_bigg(mid,model, + namespace, + idprefix) + if tmp_meta: + metabolites[tmp_meta] = -1*factor + meta_counter += 1 + else: + return None # not able to build reaction successfully + + # reconstruct products + for mid,factor in products.items(): + tmp_meta = build_metabolite_bigg(mid,model, + namespace, + idprefix) + if tmp_meta: + metabolites[tmp_meta] = factor + meta_counter += 1 + else: + return None # not able to build reaction successfully + + # add metabolites to reaction + # @TODO: does it need some kind of try and error, if - for some highly unlikely reason - two newly generated ids are the same + new_reac.add_metabolites(metabolites) + + # set reversibility + if rev: + new_reac.bounds = (1000.0,1000.0) + else: + new_reac.bounds = (0.0,1000.0) + + # add annotations + # --------------- + # add SBOTerm + new_reac.annotation['sbo'] = 'SBO:0000167' + # add infos from BiGG + new_reac.annotation['bigg.reaction'] = [id] + _add_annotations_from_bigg_reac_row(bigg_reac_info, new_reac) + # add additional references from the parameter + _add_annotations_from_dict_cobra(references,new_reac) + + # add notes + # --------- + new_reac.notes['created with'] = 'refineGEMs GapFiller, BiGG' + + # match ID to namespace + # --------------------- + match_id_to_namespace(new_reac,namespace) + + return new_reac + + +@implement +def build_reaction_biocyc(): + pass + + +@implement +# maybe for later, if we need something to work independantly from namespace +# and outside the gapfilling +def build_reaction(): + pass + + +# ++++++++++++++++++++++++++++++++++++++++ +# libSBML - models +# ++++++++++++++++++++++++++++++++++++++++ + # extracting reactions & Co via libsbml # ------------------------------------- @@ -361,7 +1483,25 @@ def get_model_reacs_or_metabs(model_libsbml: libModel, metabolites: bool=False, return reac_or_metab_list_df -# @TOCO check for new-ish functionalities / merge with create_gp and delete +def get_reversible(fluxes: dict[str: str]) -> bool: + """Infer if reaction is reversible from flux bounds + + Args: + - fluxes (dict): + Dictionary containing the keys 'lower_bound' & 'upper_bound' + with values in ['cobra_default_lb', 'cobra_0_bound', 'cobra_default_ub'] + + Returns: + bool: + True if reversible else False + """ + return (fluxes['lower_bound'] == 'cobra_default_lb') and (fluxes['upper_bound'] == 'cobra_default_ub') + + +# create model entities using libSBML +# ----------------------------------- + +# @TODO check for new-ish functionalities / merge with create_gp and delete def create_gpr_from_locus_tag(model: libModel, locus_tag: str, email: str) -> tuple[GeneProduct, libModel]: """Creates GeneProduct in the given model @@ -397,7 +1537,10 @@ def create_gpr_from_locus_tag(model: libModel, locus_tag: str, email: str) -> tu return gpr, model -def create_gp(model: libModel, model_id: str, name: str, locus_tag: str, protein_id: str) -> tuple[GeneProduct, libModel]: +# @TODO : check, if the function (in ths way), is still used anywhere and adjust to the +# new one +# @DEPRECATE +def old_create_gp(model: libModel, model_id: str, name: str, locus_tag: str, protein_id: str) -> tuple[GeneProduct, libModel]: """Creates GeneProduct in the given model Args: @@ -433,6 +1576,58 @@ def create_gp(model: libModel, model_id: str, name: str, locus_tag: str, protein return gp, model +# @NEW - substitues the one above +# --> WARNING: Output is different now +# @TODO generalise addition of references -> maybe kwargs +def create_gp(model:libModel, protein_id:str, + model_id:str=None, + name:str=None, locus_tag:str=None, + uniprot:tuple[list,bool]=None) -> None: + """Creates GeneProduct in the given libSBML model. + + Args: + - model (libModel): + The model object, loaded with libSBML. + - protein_id (str): + (NCBI) Protein ID of the gene. + - model_id (str, optional): + If given, uses this string as the ID of the gene in the model. + ID should be identical to ID that CarveMe adds from the NCBI FASTA input file. + Defaults to None. + - name (str, optional): + Name of the GeneProduct. Defaults to None. + - locus_tag (str, optional): + Genome-specific locus tag. Will be used as label in the model. + Defaults to None. + - uniprot (tuple[list,bool], optional): + Tuple of a list of UniProt IDs and a boolean for whether the strain is from the lab or + a database. Defaults to None. + """ + + # create gene product object + gp = model.getPlugin(0).createGeneProduct() + # set basic attributes + if model_id: # ID + gp.setIdAttribute(model_id) + else: + geneid = f'G_{protein_id}'.replace('.','_') # remove problematic signs + gp.setIdAttribute(geneid) + if name: gp.setName(name) # Name + if locus_tag: gp.setLabel(locus_tag) # Label + gp.setSBOTerm('SBO:0000243') # SBOterm + gp.setMetaId(f'meta_G_{protein_id}') # Meta ID + # test for NCBI/RefSeq + if re.fullmatch('^(((AC|AP|NC|NG|NM|NP|NR|NT|NW|WP|XM|XP|XR|YP|ZP)_\d+)|(NZ_[A-Z]{2,4}\d+))(\.\d+)?$', protein_id, re.IGNORECASE): + id_db = 'REFSEQ' + elif re.fullmatch('^(\w+\d+(\.\d+)?)|(NP_\d+)$', protein_id, re.IGNORECASE): id_db = 'NCBI' + if id_db: add_cv_term_genes(protein_id, id_db, gp) # NCBI protein + # add further references + # @TODO extend or generalise + if uniprot: + for uniprotid in uniprot[0]: + add_cv_term_genes(uniprotid, 'UNIPROT', gp, uniprot[1]) # UniProt + + def create_species( model: libModel, metabolite_id: str, name: str, compartment_id: str, charge: int, chem_formula: str ) -> tuple[Species, libModel]: @@ -475,21 +1670,6 @@ def create_species( return metabolite, model -def get_reversible(fluxes: dict[str: str]) -> bool: - """Infer if reaction is reversible from flux bounds - - Args: - - fluxes (dict): - Dictionary containing the keys 'lower_bound' & 'upper_bound' - with values in ['cobra_default_lb', 'cobra_0_bound', 'cobra_default_ub'] - - Returns: - bool: - True if reversible else False - """ - return (fluxes['lower_bound'] == 'cobra_default_lb') and (fluxes['upper_bound'] == 'cobra_default_ub') - - def create_reaction( model: libModel, reaction_id: str, name:str, reactants: dict[str: int], products: dict[str: int], fluxes: dict[str: str], reversible: bool=None, fast: bool=None, compartment: str=None, sbo: str=None, @@ -558,3 +1738,60 @@ def create_reaction( add_cv_term_reactions(reaction_id, 'BIGG', reaction) return reaction, model + +# @TODO : does it cover indeed all cases (for adding GPR together) ? +# @TODO : only support OR connection +def create_gpr(reaction:Reaction,gene:str|list[str]) -> None: + """For a given libSBML Reaction and a gene ID or a list of gene IDs, + create a gene production rule inside the reaction. + + Currently only supports 'OR' causality. + + Args: + - reaction (libsbml.Reaction): + The reaction object to add the GPR to. + - gene (str | list[str]): + Either a gene ID or a list of gene IDs, that will be added to the GPR + (OR causality). + """ + + # Step 1: test, if there is already a gpr + # --------------------------------------- + old_association_str = None + old_association_fbc = None + if reaction.getPlugin(0).getGeneProductAssociation(): + old_association = reaction.getPlugin(0).getGeneProductAssociation().getListOfAllElements() + # case 1: only a single association + if len(old_association) == 1 and isinstance(old_association[0],GeneProductRef): + old_association_str = old_association[0].getGeneProduct() + # case 2: nested structure of asociations + elif isinstance(old_association[0], FbcOr) or isinstance(old_association[0], FbcAnd): + old_association_fbc = old_association[0].clone() + # this should get the highest level association (that includes all others) + + + # Step 2: create new gene product association + # ------------------------------------------- + if old_association_str and isinstance(gene,str): + gene = [old_association_str,id] + elif old_association_str and isinstance(gene,list): + gene.append(old_association_str) + + # add the old association rule as an 'OR' (if needed) + if not old_association_fbc: + new_association = reaction.getPlugin(0).createGeneProductAssociation() + else: + new_association = reaction.getPlugin(0).createGeneProductAssociation().createOr() + new_association.addAssociation(old_association_fbc) + + # add the remaining genes + # @TODO currently, only connection possible is 'OR' + if isinstance(gene,str): + new_association.createGeneProductRef().setGeneProduct(gene) + elif isinstance(gene,list) and len(gene) == 1: + new_association.createGeneProductRef().setGeneProduct(gene[0]) + elif isinstance(gene,list) and len(gene) > 1: + gpa_or = new_association.createOr() + for i in gene: + gpa_or.createGeneProductRef().setGeneProduct(i) +