Source code for mgkit.taxon

"""
This module gives access to Uniprot taxonomy data. It also defines classes
to filter, order and group data by taxa
"""
from builtins import object
from functools import reduce
import logging
import itertools
import collections
import pickle
from tqdm import tqdm
from future.utils import viewitems, viewvalues
from .io import open_file


LOG = logging.getLogger(__name__)

ALGAE = {
    'haptophyceae': 2830,
    'chlorophyta': 3041,
    'stramenopiles': 33634,
    'cryptophyta': 3027,
    'rhodophyta': 2763,
}

# superkingdoms
BACTERIA = 2
ARCHAEA = 2157
VIRUS = 10239

CELLULAR_ORGANISMS = 131567

# kingdoms
FUNGI = 4751
VIRIDIPLANTAE = 33090
METAZOA = 33208
EUKARYOTA = 2759

PROTISTS = {
    # 'apicomplexa', alveolata
    # 'ciliophora', alveolata
    'alveolata': 33630,
    'amoebozoa': 554915,
    'apusozoa': 554296,
    'breviatea': 1401294,
    'centroheliozoa': 193537,
    'choanoflagellida': 28009,
    'diplomonadida': 5738,
    'euglenozoa': 33682,
    'formicata': 207245,
    'heterolobosea': 5752,
    'jakobida': 556282,
    'malawimonadidae': 136087,
    'oxymonadida': 66288,
    'parabasalia': 5719,
    'rhizaria': 543769,
}

PLANTS = {
    'streptophyta': 35493,
}

TAXON_ROOTS = (
    'archaea',  # 2157
    'bacteria',  # 2
    'fungi',  # 4751
    'metazoa',
    'environmental samples',
    'viruses',
    'viroids'
    'eukaryota',
    'other sequences',
    'unidentified'
) + tuple(PROTISTS) + tuple(PLANTS) + tuple(ALGAE)
"Root taxa used in analysis and filtering"

TAXON_RANKS = (
    'superkingdom',
    'kingdom',
    'phylum',
    'class',
    'subclass',
    'order',
    'family',
    'genus',
    'species'
)
"Taxonomy ranks included in the pickled data"

TaxonTuple = collections.namedtuple(
    'UniprotTaxonTuple',
    ('taxon_id', 's_name', 'c_name', 'rank', 'lineage', 'parent_id')
)
"""
A representation of a Uniprot Taxon
"""

# To be deprecated to avoid confusion
UniprotTaxonTuple = TaxonTuple


[docs]def parse_uniprot_taxon(line, light=True): """ .. versionchanged:: 0.1.13 now accepts empty scientific names, for root taxa .. versionchanged:: 0.2.1 added *light* parameter .. deprecated:: 0.4.0 Parses a Uniprot taxonomy file (tab delimited) line and returns a UniprotTaxonTuple instance. If *light* is True, lineage is not stored to decrease the memory usage. This is now the default. """ line = line.decode('ascii') line = line.rstrip().split('\t') taxon_id = int(line[0]) try: s_name = line[2].lower() except IndexError: s_name = '' try: c_name = line[3].lower() if line[3] else '' except IndexError: c_name = '' try: rank = line[7].lower() except IndexError: rank = None try: if light: raise IndexError lineage = tuple(x.lower() for x in line[8].split('; ')) except IndexError: lineage = (None,) try: parent_id = int(line[9]) except (ValueError, IndexError): parent_id = None return UniprotTaxonTuple( taxon_id, s_name, c_name, rank, lineage, parent_id )
[docs]def parse_ncbi_taxonomy_merged_file(file_handle): """ .. versionadded:: 0.2.3 Parses the *merged.dmp* file where the merged taxon_id are stored. Available at ftp://ftp.ncbi.nlm.nih.gov/pub/taxonomy/ Arguments: file_handle (str, file): file name or handle to the file Returns: dict: dictionary with merged_id -> taxon_id """ file_handle = open_file(file_handle, 'r') LOG.info( "Reading NCBI taxonomy merged file %s", getattr(file_handle, 'name', repr(file_handle)) ) merged_taxa = {} for line in file_handle: line = line.decode('ascii') merged_id, taxon_id = [col for col in line.strip().split('\t') if col != '|'] merged_taxa[int(merged_id)] = int(taxon_id) return merged_taxa
[docs]def parse_ncbi_taxonomy_names_file(file_handle, name_classes=('scientific name', 'common name')): """ .. versionadded:: 0.2.3 Parses the *names.dmp* file where the names associated to a taxon_id are stored. Available at ftp://ftp.ncbi.nlm.nih.gov/pub/taxonomy/ Arguments: file_handle (str, file): file name or handle to the file name_classes (tuple): name classes to save, only the scientific and common name are stored Returns: dict: dictionary with merged_id -> taxon_id """ file_handle = open_file(file_handle, 'r') LOG.info( "Reading NCBI taxonomy names file %s", getattr(file_handle, 'name', repr(file_handle)) ) taxa_names = {} for line in file_handle: line = line.decode('ascii') taxon_id, taxon_name, uniq_name, name_class = [ col.strip() for col in line.strip().split('\t') if col != '|' ] taxon_id = int(taxon_id) if name_class not in name_classes: continue if taxon_id not in taxa_names: taxa_names[taxon_id] = {} taxa_names[taxon_id][name_class] = taxon_name return taxa_names
[docs]def parse_ncbi_taxonomy_nodes_file(file_handle, taxa_names=None): """ .. versionadded:: 0.2.3 Parses the *nodes.dmp* file where the nodes of the taxonomy are stored. Available at ftp://ftp.ncbi.nlm.nih.gov/pub/taxonomy/. Arguments: file_handle (str, file): file name or handle to the file taxa_names (dict): dictionary with the taxa names (returned from :func:`parse_ncbi_taxonomy_names_file`) Yields: TaxonTuple: TaxonTuple instance """ file_handle = open_file(file_handle, 'r') LOG.info( "Reading NCBI taxonomy nodes file %s", getattr(file_handle, 'name', repr(file_handle)) ) for line in file_handle: line = line.decode('ascii') line = [col for col in line.strip().split('\t') if col != '|'] taxon_id = int(line[0]) parent_id = int(line[1]) if parent_id == 1: # NCBI uses 1 as the root, but the rest of the functions expect None parent_id = None rank = line[2].lower() s_name = '' c_name = '' if taxa_names is not None: try: names = taxa_names[taxon_id] except KeyError: names = {} s_name = names.get('scientific name', '') c_name = names.get('common name', '') yield TaxonTuple( taxon_id, s_name, c_name, rank, (None,), # lineage is not found in the NCBI taxonomy dump parent_id )
[docs]class Taxonomy(object): """ Class that contains the whole Uniprot taxonomy. Defines some methods to easy access of taxonomy. Follows the conventions of NCBI Taxonomy. Defines: * methods to load taxonomy from a pickle file or a generic file handle * can be iterated over and returns a generator its UniprotTaxon instances * can be used as a dictionary, in which the key is a taxon_id and the value is its UniprotTaxon instance """ def __init__(self, fname=None): """ :param str fname: file name of the pickled data. """ self._taxa = {} self._name_map = {} self._alt_map = {} if fname: self.load_data(fname)
[docs] @staticmethod def parse_phylophlan_lineage(lineage, sep='|', field_sep='\t', id_col=10, name_col=9): """ .. versionadded:: 0.5.7 Parses a line from PhyloPhlan 3 taxonomy Arguments: lineage (str): line of PhyloPhlan 3 taxonomy sep (str): separator for the taxa string field_sep (str): field separator id_col (int): index of the column with NCBI IDs name_col (int): index of the column with the lineage Returns: list: list of dictionaries, with values that can be used with :class:`TaxonTuple` """ fields = lineage.strip().split(field_sep) ranks = dict(k='superkingdom', f='family', p='phylum', o='order', g='genus', s='species', c='class', t='strain') lineage = fields[name_col].split(sep) taxon_ids = [int(taxon_id) if taxon_id else None for taxon_id in fields[id_col].split(sep)] if len(lineage) == (len(taxon_ids) + 1): taxon_ids.append(None) elif len(lineage) != len(taxon_ids): LOG.error("Fields have unexpected length: lineage %d, taxon_ids %d", len(lineage), len(taxon_ids)) taxa_info = [] for idx, (taxon_id, taxon_name) in enumerate(zip(taxon_ids, lineage)): # but split into rank/name rank, s_name = taxon_name.split('__', 1) s_name = s_name.replace('_', ' ') taxon_info = dict( # as defined in the taxonomy c_name=taxon_name, s_name=s_name, rank=ranks[rank], taxon_id=taxon_id, # additional information columns (Unknown, ID) lineage=(None,) if idx < (len(taxon_ids) - 1) else (fields[7], fields[1]), parent_id=None if idx == 0 else taxon_ids[idx - 1] ) taxa_info.append(taxon_info) return taxa_info
[docs] def read_from_phylophlan_taxonomy(self, file_name, field_sep='\t'): """ .. versionadded:: 0.5.7 Parses a PhyloPhlan 3 taxonomy. NCBI IDs will be kept, new IDs will be negative to avoid confusion with NCBI IDs. Arguments: file_name (str): file name of the taxonomy field_sep (str): field separator in the file """ LOG.info("Reading taxonomy from %s", file_name) name_cache = {} for line in tqdm(open_file(file_name, 'rb')): line = line.decode('ascii') if line.startswith('#'): continue taxa_info = self.parse_phylophlan_lineage( line, field_sep=field_sep) # Adds the taxa with a taxon_id set for idx, taxon_info in enumerate(taxa_info): taxon_id = taxon_info['taxon_id'] # It's a NCBI taxon_id if taxon_id is None: try: # Check in cache first taxon_id = name_cache[(taxon_info['s_name'], taxon_info['rank'])] except KeyError: # New ID taxon_id = self.min_id - 1 # needs to set parent_id for non superkingdom if (taxon_info['parent_id'] is None) and (taxon_info['rank'] != 'superkingdom'): parent_id = name_cache[ (taxa_info[idx - 1]['s_name'], taxa_info[idx - 1]['rank']) ] taxon_info['parent_id'] = parent_id if (taxon_id in self) and (taxon_info['lineage'] == (None,)): # No need to add, any parent_id will be correct continue else: # adds it to the taxonomy self[taxon_id] = TaxonTuple( taxon_id, taxon_info['s_name'], taxon_info['c_name'], taxon_info['rank'], taxon_info['lineage'], taxon_info['parent_id'] ) name_cache[(taxon_info['s_name'], taxon_info['rank'])] = taxon_id
[docs] def gen_alt_map(self): for taxon in self: if taxon.lineage != (None,): self._alt_map[taxon.lineage] = taxon.taxon_id
[docs] def get_by_lineage(self, lineage): """ .. versionadded:: 0.5.7 Returns the taxon_id provided an ID stored in the `lineage` attribute of the Taxon """ if not self._alt_map: self.gen_alt_map() if isinstance(lineage, str): lineage = tuple(lineage.split('_')) return self._alt_map.get(lineage, None)
[docs] @staticmethod def parse_gtdb_lineage(lineage, sep=';'): """ .. versionadded:: 0.3.3 Parse a GTDB lineage, one that defines the rank as a single letter, followed by **__** for each taxon name. Taxa are separated by semicolon by default. Also the **domain** rank is renamed into **superkingdom** to allow mixing of taxonomies. Returns: dict: dictionary with the parsed lineage, which can be passed to :meth:`Taxonomy.add_lineage` """ lineage_dict = {} ranks = dict(d='superkingdom', f='family', p='phylum', o='order', g='genus', s='species', c='class') for taxon_name in lineage.split(sep): rank, taxon_name = taxon_name.split('__') if not taxon_name: continue lineage_dict[ranks[rank]] = taxon_name return lineage_dict
[docs] def read_from_gtdb_taxonomy(self, file_handle, use_gtdb_name=True, sep='\t'): """ .. versionadded:: 0.3.0 .. versionchanged:: 0.3.1 replaced *domain* with *superkingdom* to support *get_lineage* Reads a GTDB taxonomy file (tab separated genome_id/taxonomy) and populate the taxonomy instance. The method also return a dictionary of genome_id -> taxon_id. Arguments: file_handle (file): file with the taxonomy use_gtdb_name (bool): if True, the names are kept as-is in the *s_name* attribute of :class:`TaxonTuple` and the "cleaned" version in *c_name* (e.g. f__Ammonifexaceae -> Ammonifexaceae). If False, the values are switched sep (str): separator between the columns of the file Returns: dict: dictionary of genome_id -> taxon_id, reflecting the created taxonomy .. note:: the taxon_id are generated, so there's no guarantee they will be the same in a successive execution """ if isinstance(file_handle, str): file_handle = open_file(file_handle, 'r') LOG.info( "Reading GTDB taxonomy from file (%s)", getattr(file_handle, 'name', repr(file_handle)) ) genome_ids = {} # ranks used in GTDB ranks = { 'g': 'genus', 'f': 'family', 'p': 'phylum', 'o': 'order', 'd': 'superkingdom', 's': 'species', 'c': 'class' } taxon_ids = {} count = 1 for line in file_handle: line = line.decode('ascii') # expecting the table to be the exported file from GTDB try: # print line.strip().split(sep) genome_id, line = line.strip().split(sep) # in case there's no gtdb taxonomy except ValueError: continue line = [ taxon_name.strip() for taxon_name in line.split(';') if len(taxon_name) > 3 ] if not line: continue # if no parent, use None as the NCBI taxonomy parent_id = None for taxon_name in line: # register a taxo_id for unknown taxon if taxon_name not in taxon_ids: taxon_ids[taxon_name] = count count += 1 taxon_id = taxon_ids[taxon_name] try: rank = ranks[taxon_name[0]] except KeyError: raise KeyError # keep the full gtdb name in the common name # cut the scientific name to remove the rank information common_name = taxon_name taxon_name = taxon_name[3:] # but if th use_gtdb_name option is True, switch them if use_gtdb_name: taxon_name, common_name = common_name, taxon_name self[taxon_id] = TaxonTuple( taxon_id, taxon_name, common_name, rank, (None,), parent_id ) parent_id = taxon_id genome_ids[genome_id] = taxon_id return genome_ids
[docs] def read_from_ncbi_dump(self, nodes_file, names_file=None, merged_file=None): """ .. versionadded:: 0.2.3 Uses the *nodes.dmp* and optionally *names.dmp*, *merged.dmp* files from ftp://ftp.ncbi.nlm.nih.gov/pub/taxonomy/ to populate the taxonomy. Arguments: nodes_file (str, file): file name or handle to the file names_file (str, file, None): file name or handle to the file, if None, names won't be added to the taxa merged_file (str, file, None): file name or handle to the file, if None, pointers to merged taxa won't be added """ LOG.info("Reading NCBI taxonomy from file") if names_file is not None: taxa_names = parse_ncbi_taxonomy_names_file(names_file) else: taxa_names = None for taxon in parse_ncbi_taxonomy_nodes_file(nodes_file, taxa_names): self[taxon.taxon_id] = taxon if merged_file is not None: merged_taxa = parse_ncbi_taxonomy_merged_file(merged_file) for merged_id, taxon_id in viewitems(merged_taxa): self[merged_id] = self[taxon_id]
[docs] def read_taxonomy(self, f_handle, light=True): """ .. versionchanged:: 0.2.1 added *light* parameter .. deprecated:: 0.4.0 use :meth:`Taxonomy.read_from_ncbi_dump` Reads taxonomy from a file handle. The file needs to be a tab separated format return by a query on Uniprot. If *light* is True, lineage is not stored to decrease the memory usage. This is now the default. New taxa will be added, duplicated taxa will be skipped. :param handle f_handle: file handle of the taxonomy file. """ try: LOG.info("Loading taxonomy from file %s", f_handle.name) except AttributeError: pass f_handle.readline() for line in f_handle: try: # taxon = UniprotTaxon(line=line) taxon = parse_uniprot_taxon(line, light=light) except Exception: # print line.strip().split('\t') LOG.debug("skipped line: %s", line) continue if taxon.taxon_id in self: continue self[taxon.taxon_id] = taxon
[docs] def gen_name_map(self): """ .. versionchanged:: 0.2.3 names are stored in the mapping as lowercase Generate a name map, where to each scientific name in the taxonomy an id is associated. """ LOG.debug('Generate name map') self._name_map = {} for taxon_obj in self: s_name = taxon_obj.s_name.lower() try: self._name_map[s_name].append(taxon_obj.taxon_id) except KeyError: self._name_map[s_name] = [taxon_obj.taxon_id]
[docs] def load_data(self, file_handle): """ .. versionchanged:: 0.2.3 now can use read *msgpack* serialised files .. versionchanged:: 0.1.13 now accepts file handles and compressed files (if file names) Loads serialised data from file name "file_handle" and accepts compressed files. if the *.msgpack* string is found in the file name, the *msgpack* package is used instead of pickle Arguments: file_handle (str, file): file name to which save the instance data """ if isinstance(file_handle, str): file_handle = open_file(file_handle, 'rb') LOG.info("Loading taxonomy from file %s", file_handle.name) if '.msgpack' in file_handle.name: import msgpack merged = [] for taxon_id, taxon in msgpack.Unpacker(file_handle, use_list=False, raw=False): taxon = TaxonTuple(*taxon) # if it's a merged taxon_id keep it and don't add it if taxon_id != taxon.taxon_id: merged.append((taxon_id, taxon.taxon_id)) else: self[taxon_id] = taxon # the list of merged ids is iterated over and pointers are added to # the correct ids for merged_id, taxon_id in merged: self[merged_id] = self[taxon_id] else: self._taxa = pickle.load(file_handle)
[docs] def save_data(self, file_handle): """ .. versionchanged:: 0.2.3 now can use *msgpack* to serialise Saves taxonomy data to a file handle or file name, can write compressed data if the file ends with ".gz", ".bz2" if the *.msgpack* string is found in the file name, the *msgpack* package is used instead of pickle Arguments: file_handle (str, file): file name to which save the instance data """ if isinstance(file_handle, str): file_handle = open_file(file_handle, 'wb') LOG.info("Saving taxonomy to file %s", file_handle.name) if '.msgpack' in file_handle.name: import msgpack for taxon_id, taxon in viewitems(self._taxa): file_handle.write( msgpack.packb((taxon_id, taxon)) ) else: pickle.dump(self._taxa, file_handle, -1)
[docs] def find_by_name(self, s_name, rank=None, strict=True): """ .. versionchanged:: 0.2.3 the search is now case insensitive .. versionchanged:: 0.3.1 added *rank* and *strict* parameter Returns the taxon IDs associated with the scientific name provided Arguments: s_name (str): the scientific name rank (str, None): return only a taxon_id of a specific rank strict (book): if True and *rank* is not None, KeyError will be raised if multiple taxa have the same name and rank Returns: list: a reference to the list of IDs that have for *s_name*, if *rank* is None. If *rank* is not None and one taxon is found, its taxon_id is returned, or None if no taxon is found. If *strict* is True and *rank* is not None, the set of taxon_ids found is resturned. Raises: KeyError: If multiple taxa are found, a *KeyError* exception is raised. """ if not self._name_map: self.gen_name_map() if rank is None: return list(set(self._name_map[s_name.lower()])) else: taxon_ids = [ taxon_id for taxon_id in self._name_map.get(s_name.lower(), []) if self[taxon_id].rank == rank ] if len(set(taxon_ids)) > 1: if strict: raise KeyError( "More than one taxon ({}) with rank {}".format( s_name, rank ) ) else: return set(taxon_ids) elif len(taxon_ids) == 0: return None else: return taxon_ids[0]
[docs] def is_ancestor(self, leaf_id, anc_ids): """ .. versionchanged:: 0.1.13 now uses :func:`is_ancestor` and changed behavior Checks if a taxon is the leaf of another one, or a list of taxa. :param int leaf_id: leaf taxon id :param int anc_ids: ancestor taxon id(s) :return bool: True if the ancestor taxon is in the leaf taxon lineage """ if isinstance(anc_ids, int): anc_ids = [anc_ids] for anc_id in anc_ids: if is_ancestor(self, leaf_id, anc_id): return True return False
[docs] def get_ranked_taxon(self, taxon_id, rank=None, ranks=TAXON_RANKS, roots=False): """ .. versionchanged:: 0.1.13 added *roots* argument Traverse the branch of which the *taxon* argument is the leaf backward, to get the specific rank to which the *taxon* belongs to. .. warning:: the *roots* options is kept for backward compatibility and should be be set to *False* :param taxon_id: id of the taxon or instance of :class:`UniprotTaxon` :param str rank: string that specify the rank, if None, the first valid rank will be searched. (i.e. the first with a value different from '') :param ranks: tuple of all taxonomy ranks, default to the default module value :param bool roots: if True, uses :data:`TAXON_ROOTS` to solve the root taxa :return: instance of :class:`TaxonTuple` for the rank found. """ if isinstance(taxon_id, int): ranked = self[taxon_id] else: raise ValueError("Not a valid taxon_id: {0}".format(taxon_id)) while True: if (rank is None) and (ranked.rank != ''): break elif (ranked.rank == rank): break elif (ranked.rank in ranks) and \ (ranks.index(ranked.rank) < ranks.index(rank)): break elif ranked.parent_id is None: break # kept only for backward compatibility. # needs a check to other scripts elif roots and (ranked.s_name in TAXON_ROOTS): break ranked = self[ranked.parent_id] return ranked
[docs] def get_ranked_id(self, taxon_id, rank=None, it=False, include_higher=True): """ .. versionadded:: 0.3.4 Gets the ranked taxon of another one. Useful when it's better to get a taxon_id instead of an instance of :class:`TaxonTuple`. Internally, it relies on :meth:`Taxonomy.get_ranked_taxon`. Arguments: taxon_id (int): taxon_id rank (str or None): passed over it (bool): determines the return value. if True, a list is returned include_higher (bool): if True, any rank higher than the requested may be returned. If False and the rank cannot be returned, None is returned Returns: int or list: The type returned is based on the *it* paramenter. If *it* is True, the return value is a list with the *taxon_id* of the ranked taxon as the sole value. If False, the returned value is the *taxon_id*. *include_higher* determines if the return value should be *None* if the exact rank was not found and *include_higher* is False """ taxon = self.get_ranked_taxon(taxon_id, rank) if (taxon.rank != rank) and (not include_higher): taxon = None else: taxon = taxon.taxon_id if it: taxon = [taxon] return taxon
[docs] def is_ranked_below(self, taxon_id, rank, equal=True): """ .. versionadded:: 0.4.0 Tests if a taxon_id is below the requested rank. Arguments: taxon_id (int): taxo_id to test rank (str): rank requested equal (bool): determines if the taxon_id tested may be of the requested rank Returns: bool: If the passed *taxon_id* is below the requested rank, it returns *True*. If *taxon_id* is of the rank requested and *equal* is True, the return value is *True*, if *equal* is False the return value is *False*. The return value is *False* otherwise. """ if (self[taxon_id].rank == rank) and equal: return True taxon_id = self[taxon_id].parent_id while (taxon_id is not None): if self[taxon_id].rank == rank: return True else: taxon_id = self[taxon_id].parent_id return False
[docs] def get_name_map(self): "Returns a taxon_id->s_name dictionary" return dict( (taxon.taxon_id, taxon.s_name) for taxon in self )
[docs] def get_lineage(self, taxon_id, names=False, only_ranked=True, with_last=True, **kwd): """ .. versionadded:: 0.3.1 Proxy for :func:`get_lineage`, with changed defaults. Other keyword arguments are passed to :func:`get_lineage` Arguments: taxon_id (int): taxon_id to return the lineage names (bool): if the elements of the list are converted into the scientific names only_ranked (bool): only return the ranked taxa with_last (bool): include the taxon_id passed to the list Returns: list: the lineage of the passed taxon_id as a list of IDs or names """ return get_lineage( self, taxon_id, names=names, only_ranked=only_ranked, with_last=with_last, **kwd )
[docs] def get_lineage_string(self, taxon_id, only_ranked=True, with_last=True, sep=';', rank=None, **kwd): """ .. versionadded:: 0.3.3 Generates a lineage string, with the possibility of getting another ranked taxon (via :meth:`Taxonomy.get_ranked_taxon`) to another rank, such as *phylum*. Other keyword arguments are passed to called functions Arguments: taxon_id (int): taxon_id to return the lineage only_ranked (bool): only return the ranked taxa with_last (bool): include the taxon_id passed to the list sep (str): separator used to join the lineage string rank (int or None): if None the full lineage is returned, otherwise the lineage will be cut to the specified rank Returns: str: lineage string """ if rank is not None: taxon_id = self.get_ranked_taxon(taxon_id, rank=rank).taxon_id return sep.join( self.get_lineage( taxon_id, only_ranked=only_ranked, with_last=with_last, names=True, **kwd ) )
[docs] def add_taxon(self, taxon_name, common_name='', rank='no rank', parent_id=None, lineage=(None,)): """ .. versionchanged:: 0.5.7 added *lineage* and made new taxon_id negatives .. versionadded:: 0.3.1 Adds a taxon to the taxonomy. If a taxon with the same name and rank is found, its taxon_id is returned, otherwise a new taxon_id is returned. Arguments: taxon_name (str): scientific name of the taxon common_name (str): common name rank (str): rank, defaults to 'no rank' parent_id (int): taxon_id of the parent, defaults to *None*, which is the taxonomy root lineage (tuple): lineage attribute in :class:`TaxonTuple` Returns: int: the taxon_id of the added taxon (if new), or the taxon_id of the taxon with the same name and rank found in the taxonomy Raises: KeyError: if more than one taxon has already the passed name and rank and it can't be resolved by looking at the parent_id passed, the exception is raised. """ # Tries to find it in the name map taxon_id = self.find_by_name(taxon_name, rank=rank, strict=False) # Multiple taxa with the same name, check if the parent_id of any of # them can be resolved if isinstance(taxon_id, set): # print taxon_id taxon_ids = taxon_id taxon_id = None for t_id in taxon_ids: if self[t_id].parent_id == parent_id: taxon_id = t_id if taxon_id is None: raise KeyError( "Multiple taxa ({}, {}) are in the taxonomy but cannot be associated with the parent_id ({}) provided".format( taxon_name, rank, parent_id ) ) if (taxon_id is None) or (self[taxon_id].parent_id != parent_id): taxon_id = self.min_id - 1 self[taxon_id] = TaxonTuple( taxon_id, taxon_name, common_name, rank, lineage, parent_id ) # adds the new name to the name map (which is initialised by # find_by_name) try: self._name_map[taxon_name.lower()].append(taxon_id) except KeyError: self._name_map[taxon_name.lower()] = [taxon_id] return taxon_id
[docs] def add_lineage(self, **lineage): """ .. versionadded:: 0.3.1 Adds a lineage to the taxonomy. It's passed by keyword arguments, where each key is a value in the `TAXON_RANKS` rankes and the value is the scientific name. Appended underscores '_' will be stripped from the rank name. This is for cases such as *class* where the key is a reserved word in Python. Also one extra node can be added, such as strain/cultivar/subspecies and so on, but one only is expected to be passed. Arguments: lineage (dict): the lineage as a keyword arguments Returns: int: the taxon_id of the last element in the lineage Raises: ValueError: if more than a keyword argument is not contained in `TAXON_RANKS` """ lineage = { key.rstrip('_'): value for key, value in viewitems(lineage) } extra_nodes = set(lineage) - set(TAXON_RANKS) if len(extra_nodes) > 1: raise ValueError( "Expected at most a one lower level below species, found: {}".format( ', '.join(extra_nodes) ) ) else: extra_nodes = list(extra_nodes) if len(extra_nodes) == 1 else [] ranks = list(TAXON_RANKS) + extra_nodes parent_ids = [None] taxon_id = None for rank in ranks: try: taxon_name = lineage[rank] except KeyError: continue if not isinstance(taxon_name, str): continue taxon_id = self.add_taxon( taxon_name, rank=rank, parent_id=parent_ids[-1] ) # print taxon_id, rank, taxon_name, parent_ids[-1] parent_ids.append(taxon_id) return taxon_id
[docs] def drop_taxon(self, taxon_id): """ .. versionadded:: 0.3.1 Drops a taxon and all taxa below it in the taxonomy. Also reset the name map for conistency. Arguments: taxon_id (int): taxon_id to drop from the taxonomy """ drop_keys = [] for leaf_id in self._taxa: if self.is_ancestor(leaf_id, taxon_id): drop_keys.append(leaf_id) for taxon_id in drop_keys: del self._taxa[taxon_id] self._name_map = {}
@property def max_id(self): """ .. versionadded:: 0.5.7 Gets the highest taxon_id in the taxonomy """ return max(self._taxa) if self._taxa else 0 @property def min_id(self): """ .. versionadded:: 0.5.7 Gets the lowest taxon_id in the taxonomy """ return min(self._taxa) if self._taxa else 0
[docs] def iter_ids(self): """ .. versionadded:: 0.5.4 Iterates over the taxon IDs """ for taxon_id in self._taxa: yield taxon_id
[docs] def __getitem__(self, taxon_id): """ Defines dictionary behavior. Key is a taxon_id, the returned value is a UniprotTaxon instance """ return self._taxa[taxon_id]
def __setitem__(self, taxon_id, taxon_obj): self._taxa[taxon_id] = taxon_obj def __delitem__(self, taxon_id): del self._taxa[taxon_id]
[docs] def __iter__(self): """ Defines iterable behavior. Returns a generator for UniprotTaxon instances """ for taxon in viewvalues(self._taxa): yield taxon
[docs] def __len__(self): """ Returns the number of taxa contained """ return len(self._taxa)
[docs] def __contains__(self, taxon): """ Returns True if the taxon is in the taxonomy Accepts an int (check for taxon_id) or an instance of UniprotTaxon """ if isinstance(taxon, int): return taxon in self._taxa return False
[docs] def __repr__(self): """ .. versionadded:: 0.2.5 """ return "{} - {} taxa".format(self.__class__, len(self))
UniprotTaxonomy = Taxonomy
[docs]def get_ancestor_map(leaf_ids, anc_ids, taxonomy): """ This function returns a dictionary where every leaf taxon is associated with the right ancestors in anc_ids ex. {clostridium: [bacteria, clostridia]} """ anc_map = {} for leaf_id in leaf_ids: for anc_id in anc_ids: if taxonomy.is_ancestor(leaf_id, anc_id) or (leaf_id == anc_id): try: anc_map[leaf_id].add(anc_id) except KeyError: anc_map[leaf_id] = set() anc_map[leaf_id].add(anc_id) return anc_map
[docs]def is_ancestor(taxonomy, taxon_id, anc_id): """ .. versionchanged:: 0.1.16 if a *taxon_id* raises a KeyError, False is returned Determine if the given taxon id (taxon_id) has anc_id as ancestor. :param :class:`Taxonomy` taxonomy: taxonomy used to test :param int taxon_id: leaf taxon to test :param int anc_id: ancestor taxon to test against :return bool: True if anc_id is an ancestor of taxon_id or their the same """ if taxon_id == anc_id: return True while True: # if a taxon_id is not found, ancestry is invalid anyway, so it returns # False try: taxon_id = taxonomy[taxon_id].parent_id except KeyError: return False # print taxon_id, anc_id if anc_id == taxon_id: return True if taxon_id is None: return False
[docs]class NoLcaFound(Exception): """ .. versionadded:: 0.1.13 Raised if no lowest common ancestor can be found in the taxonomy """ pass
[docs]def last_common_ancestor(taxonomy, taxon_id1, taxon_id2): """ .. versionadded:: 0.1.13 Finds the last common ancestor of two taxon IDs. An alias to this function is in the same module, called *lowest_common_ancestor* for compatibility. Arguments: taxonomy: :class:`Taxonomy` instance used to test taxon_id1 (int): first taxon ID taxon_id2 (int): second taxon ID Raturns: int: taxon ID of the lowest common ancestor Raises: NoLcaFound: if no common ancestor can be found """ lca_id = taxon_id1 while True: if is_ancestor(taxonomy, taxon_id2, lca_id): break else: lca_id = taxonomy[lca_id].parent_id if lca_id is None: raise NoLcaFound('No common ancestry') return lca_id
lowest_common_ancestor = last_common_ancestor
[docs]def distance_taxa_ancestor(taxonomy, taxon_id, anc_id): """ .. versionadded:: 0.1.16 Function to calculate the distance between a taxon and the given ancestor The distance is equal to the number of step in the taxonomy taken to arrive at the ancestor. Arguments: taxonomy: :class:`Taxonomy` instance taxon_id (int): taxonomic identifier anc_id (int): taxonomic identifier of the ancestor Raturns: int: distance between *taxon_id* and it ancestor *anc_id* """ distance = 0 while True: if anc_id == taxon_id: break taxon_id = taxonomy[taxon_id].parent_id distance += 1 return distance
[docs]def distance_two_taxa(taxonomy, taxon_id1, taxon_id2): """ .. versionadded:: 0.1.16 Calculate the distance between two taxa. The distance is equal to the sum steps it takes to traverse the taxonomy until their last common ancestor. Arguments: taxonomy: :class:`Taxonomy` instance taxon_id1 (int): taxonomic identifier of first taxon taxon_id2 (int): taxonomic identifier of second taxon Raturns: int: distance between *taxon_id1* and *taxon_id2* """ lca_id = last_common_ancestor(taxonomy, taxon_id1, taxon_id2) return sum( distance_taxa_ancestor(taxonomy, taxon_id, lca_id) for taxon_id in (taxon_id1, taxon_id2) )
[docs]def taxa_distance_matrix(taxonomy, taxon_ids): """ .. versionadded:: 0.1.16 Given a list of taxonomic identifiers, returns a distance matrix in a pairwise manner by using :func:`distance_two_taxa` on all possible two element combinations of *taxon_ids*. Arguments: taxonomy: :class:`Taxonomy` instance taxon_ids (iterable): list taxonomic identifiers Returns: pandas.DataFrame: matrix with the pairwise distances of all *taxon_ids* """ import pandas matrix = pandas.DataFrame(index=taxon_ids, columns=taxon_ids).fillna(0) for taxon_id1, taxon_id2 in itertools.combinations(taxon_ids, 2): distance = distance_two_taxa(taxonomy, taxon_id1, taxon_id2) matrix.loc[taxon_id1, taxon_id2] = distance matrix.loc[taxon_id2, taxon_id1] = distance return matrix
[docs]def get_lineage(taxonomy, taxon_id, names=False, only_ranked=False, with_last=False, add_rank=False, use_cname=False): """ .. versionadded:: 0.2.1 .. versionchanged:: 0.2.5 added *only_ranked* .. versionchanged:: 0.3.0 added *with_last* .. versionchanged:: 0.5.7 added *add_rank* and *use_cname* Returns the lineage of a taxon_id, as a list of taxon_id or taxa names Arguments: taxonomy: a :class:`Taxonomy` instance taxon_id (int): taxon_id whose lineage to return names (bool): if True, the returned list contains the names of the taxa instead of the taxon_id only_ranked (bool): if True, only taxonomic levels whose rank is in data:`TAXON_RANKS` will be returned with_last (bool): if True, the passed taxon_id is included in the lineage add_rank (bool): prepend the names in the lineage with the first letter of the rank and 2 underscores use_cnames (bool): Use common name (c_name) instead of scientific name (s_name) Returns: list: lineage of the taxon_id, the elements are `int` if names is False, and `str` when *names* is True. If a taxon has no scientific name, the common name is used. If *only_ranked* is True, the returned list only contains ranked taxa (according to :data:`TAXON_RANKS`). """ lineage = [] if with_last: lineage.append(taxon_id) while True: taxon_id = taxonomy[taxon_id].parent_id if taxon_id is None: break if only_ranked and (taxonomy[taxon_id].rank not in TAXON_RANKS): continue lineage.append(taxon_id) if names: lineage_names = [] for taxon_id in lineage: if use_cname: taxon_name = taxonomy[taxon_id].c_name if taxonomy[taxon_id].c_name else taxonomy[taxon_id].s_name else: taxon_name = taxonomy[taxon_id].s_name if taxonomy[taxon_id].s_name else taxonomy[taxon_id].c_name if add_rank: if (taxonomy[taxon_id].rank is not None) and (taxonomy[taxon_id].rank != 'no rank'): if taxonomy[taxon_id].rank == 'superkingdom': prefix = 'k' elif taxonomy[taxon_id].rank == 'strain': prefix = 't' else: prefix = taxonomy[taxon_id].rank[0] taxon_name = prefix + '__' + taxon_name lineage_names.append(taxon_name) lineage = lineage_names return list(reversed(lineage))
[docs]def last_common_ancestor_multiple(taxonomy, taxon_ids): """ .. versionadded:: 0.2.5 Applies :func:`last_common_ancestor` to an iterable that yields *taxon_id* while removing any *None* values. If the list is of one element, that *taxon_id* is returned. Arguments: taxonomy: instance of :class:`Taxonomy` taxon_ids (iterable): an iterable that yields taxon_id Returns: int: the *taxon_id* that is the last common ancestor of all taxon_ids passed Raises: NoLcaFound: when no common ancestry is found or the number of *taxon_ids* is 0 """ taxon_ids = [ taxon_id for taxon_id in taxon_ids if taxon_id is not None ] if not taxon_ids: raise NoLcaFound('Empty *taxon_ids*') return reduce( lambda x1, x2: last_common_ancestor(taxonomy, x1, x2), set(taxon_ids) )