Source code for mgkit.io.gff

"""
This modules define classes and function related to manipulation of GFF/GTF
files.
"""
from __future__ import print_function, division
from builtins import object, zip, bytes, range
from io import IOBase
import random
import itertools
import logging
import uuid
import json
from future.utils import viewitems
from collections import OrderedDict
try:
    from urllib import unquote, quote
except ImportError:
    # Python3
    from urllib.parse import unquote, quote
import mgkit.io
from ..mappings.eggnog import EGGNOG_CAT
from ..utils import sequence as seq_utils
from ..consts import MIN_COV
from ..utils.common import between, union_range, ranges_length
from ..utils.trans_tables import UNIVERSAL

LOG = logging.getLogger(__name__)


[docs]class AttributeNotFound(Exception): """ Raised if an attribute is not found in a GFF file """ pass
[docs]def write_gff(annotations, file_handle, verbose=True): """ .. versionchanged:: 0.1.12 added *verbose* argument Write a GFF to file Arguments: annotations (iterable): iterable that returns :class:`GFFKegg` or :class:`Annotation` instances file_handle (str, file): file name or file handle to write to verbose (bool): if True, a message is logged """ if isinstance(file_handle, str): file_handle = mgkit.io.open_file(file_handle, 'wb') if verbose: LOG.info( "Writing annotations to file (%s)", getattr(file_handle, 'name', repr(file_handle)) ) for annotation in annotations: annotation.to_file(file_handle)
[docs]class GenomicRange(object): """ Defines a genomic range .. versionchanged:: 0.2.1 using __slots__ for better memory usage """ __slots__ = ('seq_id', 'strand', 'start', 'end') # seq_id = 'None' # "Sequence ID" # strand = '+' # "Strand" # start = None # "Start of the range, 1-based" # end = None # "End of the range 1-based" def __init__(self, seq_id='None', start=1, end=1, strand='+'): self.seq_id = seq_id self.strand = strand self.start = start self.end = end def __eq__(self, other): if (self.seq_id != other.seq_id) or (self.strand != other.strand): return False if (self.start != other.start) or (self.end != other.end): return False return True def __len__(self): return self.end - self.start + 1 def __str__(self): return "{0}({1}):{2}-{3}".format( self.seq_id, self.strand, self.start, self.end ) def __repr__(self): return str(self)
[docs] def union(self, other): """ Return the union of two :class:`GenomicRange` """ if (self.seq_id == other.seq_id) and (self.strand == other.strand): result = union_range(self.start, self.end, other.start, other.end) if result is not None: gen_range = GenomicRange() gen_range.seq_id = self.seq_id gen_range.strand = self.strand gen_range.start = result[0] gen_range.end = result[1] return gen_range return None
def __or__(self, other): return self.union(other)
[docs] def expand_from_list(self, others): """ Expand the :class:`GenomicRange` range instance with a list of :class:`GenomicRange` Arguments: others (iterable): iterable of :class:`GenomicRange` """ new_range = self for other in others: union = new_range.union(other) if union is None: continue new_range = union self.start = new_range.start self.end = new_range.end
[docs] def intersect(self, other): """ Return an instance of :class:`GenomicRange` that represent the intersection of the current instance and another. """ if (self.seq_id == other.seq_id) and (self.strand == other.strand): if between(other.start, self.start, self.end) or \ between(other.end, self.start, self.end) or \ between(self.start, other.start, other.end) or \ between(self.end, other.start, other.end): gen_range = GenomicRange() gen_range.start = max(self.start, other.start) gen_range.end = min(self.end, other.end) return gen_range return None
def __and__(self, other): return self.intersect(other)
[docs] def __contains__(self, pos): """ .. versionchanged:: 0.2.3 a range or a subclass are accepted .. versionadded:: 0.1.16 Tests if the position is inside the range of the GenomicRange Pos is 1-based as :attr:`GenomicRange.start` and :attr:`GenomicRange.end` """ if isinstance(pos, int): return between(pos, self.start, self.end) elif isinstance(pos, GenomicRange): pos = (pos.start, pos.end) return (between(pos[0], self.start, self.end)) and \ (between(pos[1], self.start, self.end))
[docs] def get_range(self): """ .. versionadded:: 0.1.13 Returns the start and end position as a tuple """ return (self.start, self.end)
[docs] def get_relative_pos(self, pos): """ .. versionadded:: 0.1.16 Given an absolute position (referred to the reference), convert the position to a coordinate relative to the GenomicRange Returns: int: the position relative to the GenomicRange Raises: ValueError: if the position is not in the range """ if pos not in self: raise ValueError("Position {} not in GenomicRange".format(pos)) return pos - self.start + 1
[docs]class Annotation(GenomicRange): """ .. versionadded:: 0.1.12 .. versionchanged:: 0.2.1 using __slots__ for better memory usage Alternative implementation for an Annotation. When initialised, If *uid* is None, a unique id is added using `uuid.uuid4`. """ __slots__ = ('source', 'feat_type', 'score', 'phase', 'attr') # source = 'None' # "Annotation source" # feat_type = 'None' # "Annotation type (e.g. CDS, gene, exon, etc.)" # score = 0.0 # "Score associated to the annotation" # phase = 0 # "Annotation phase, (0, 1, 2)" # attr = None # "Dictionary with the key value pairs in the last column of a GFF/GTF" def __init__(self, seq_id='None', start=1, end=1, strand='+', source='None', feat_type='None', score=0.0, phase=0, uid=None, **kwd): super(Annotation, self).__init__( seq_id=seq_id, start=start, end=end, strand=strand ) self.source = source self.feat_type = feat_type self.score = score self.phase = phase self.attr = kwd if uid is None: self.uid = str(uuid.uuid4()) else: self.uid = uid def __hash__(self): return hash(self.uid)
[docs] def get_ec(self, level=4): """ .. versionadded:: 0.1.13 .. versionchanged:: 0.2.0 returns a *set* instead of a list Returns the EC values associated with the annotation, cutting them at the desired level. Arguments: level (int): level of classification desired (between 1 and 4) Returns: set: list of all EC numbers associated, at the desired level, if none are found an empty set is returned """ ec = self.attr.get('EC', None) if ec is None: return set() ec = ec.split(',') return set(['.'.join(x.split('.')[:level]) for x in ec])
[docs] def get_fc(self, description=False): """ .. versionadded:: 0.5.7 Arguments: description (bool): If True, instead of the one letter category, the description is returned Returns: list: list of functional categories """ try: fc = self.attr['FC'] except KeyError: return set() return set( EGGNOG_CAT[func_cat] if description else func_cat for func_cat in fc )
[docs] def get_mapping(self, db): """ .. versionadded:: 0.1.13 Returns the mappings, to a particular db, associated with the annotation. Arguments: db (str): database to which the mappings come from Returns: list: list of all mappings associated, to the specified db, if none are found an empty list is returned """ mappings = self.attr.get('map_{0}'.format(db.upper())) if mappings is None: return [] return mappings.split(',')
[docs] def set_mapping(self, db, values): """ .. versionadded:: 0.1.13 Set mappings to a particular db, associated with the annotation. Arguments: db (str): database to which the mappings come from mappings (iterable): iterable of mappings """ self.set_attr( 'map_{0}'.format(db.upper()), ','.join(values) )
[docs] def get_mappings(self): """ .. versionadded:: 0.2.1 Return a dictionary where the keys are the mapping DBs (lowercase) and and the values are the mapping IDs for that DB """ mappings = {} for key in self.attr: if key.startswith('map_'): db = key.replace('map_', '').lower() mappings[db] = self.get_mapping(db) return mappings
@property def taxon_id(self): """ .. versionchanged:: 0.3.1 if taxon_id is set to "None" as a string, it's converted to *None* taxon_id of the annotation """ value = self.attr.get('taxon_id', None) try: value = int(value) except (TypeError, ValueError): value = None return value @taxon_id.setter def taxon_id(self, value): """ .. versionchanged:: 0.5.7 No Exception is raised if the value cannot be converted, but the taxon_id won't be set """ try: self.attr['taxon_id'] = int(value) except TypeError: pass @property def db(self): "db used for the gene_id prediction" return self.attr.get('db', None) @db.setter def db(self, value): self.attr['db'] = value @property def taxon_db(self): "db used for the taxon_id prediction" return self.attr.get('taxon_db', None) @taxon_db.setter def taxon_db(self, value): self.attr['taxon_db'] = value @property def dbq(self): "db quality of the annotation" try: return self.get_attr('dbq', int) except AttributeNotFound: return None @dbq.setter def dbq(self, value): self.attr['dbq'] = value @property def uid(self): """ .. versionadded:: 0.1.13 uid of the annotation """ value = self.attr.get('uid', None) if value is None: # old data where the unique id is marked as ko_idx value = self.attr.get('ko_idx', None) return value @uid.setter def uid(self, value): self.attr['uid'] = value @property def bitscore(self): "bitscore of the annotation" try: return float(self.attr['bitscore']) except KeyError: # legacy for old data bitscore = self.attr.get('bit_score', None) return None if bitscore is None else float(bitscore) @bitscore.setter def bitscore(self, value): self.attr['bitscore'] = float(value) @property def gene_id(self): "gene_id of the annotation, or *ko* if available" try: return self.attr['gene_id'] except KeyError: # legacy for old data return self.attr.get('ko', None) @gene_id.setter def gene_id(self, value): self.attr['gene_id'] = value @property def length(self): """ .. versionchanged:: 0.2.0 Length of the annotation, uses `len(self)` """ return len(self) @property def region(self): """ .. versionadded:: 0.1.13 Return the *region* covered by the annotation, to use in samtools """ return "{0}:{1}:{2}".format(self.seq_id, self.start, self.end) @property def counts(self): """ .. versionadded:: 0.2.2 Returns the sample counts for the annotation """ counts = {} for key, value in viewitems(self.attr): if key.startswith('counts_'): key = key.replace('counts_', '') counts[key] = float(value) return counts @counts.setter def counts(self, counts): """ .. versionadded:: 0.2.2 Sets the sample counts for the annotation Arguments: counts (dict): key is the sample name and the count for it """ for key, value in viewitems(counts): self.attr["counts_{}".format(key)] = value @property def fpkms(self): """ .. versionadded:: 0.2.2 Returns the sample fpkms for the annotation """ fpkms = {} for key, value in viewitems(self.attr): if key.startswith('fpkms_'): key = key.replace('fpkms_', '') fpkms[key] = float(value) return fpkms @fpkms.setter def fpkms(self, fpkms): """ .. versionadded:: 0.2.2 Sets the sample fpkms for the annotation Arguments: fpkms (dict): key is the sample name and the fpmk for it """ for key, value in viewitems(fpkms): self.attr["fpkms_{}".format(key)] = value
[docs] def add_exp_syn_count(self, seq, syn_matrix=None): """ .. versionadded:: 0.1.13 Adds expected synonymous/non-synonymous values for an annotation. Arguments: seq (str): sequence corresponding to the annotation seq_id syn_matrix (None, dict): matrix that determines the return values. Defaults to the one defined in the called function :func:`mgkit.utils.sequnce.get_seq_expected_syn_count`. """ seq = self.get_nuc_seq(seq, reverse=self.strand == '-') syn_count, nonsyn_count = seq_utils.get_seq_expected_syn_count( seq, syn_matrix=syn_matrix ) self.set_attr('exp_syn', syn_count) self.set_attr('exp_nonsyn', nonsyn_count)
[docs] def to_gff(self, sep='='): """ Format the Annotation as a GFF string. Arguments: sep (str): separator key -> value Returns: str: annotation formatted as GFF """ var_names = ( 'seq_id', 'source', 'feat_type', 'start', 'end', 'score', 'strand', 'phase' ) values = '\t'.join( str(getattr(self, var_name)) for var_name in var_names ) attr_column = ';'.join( '{0}{1}"{2}"'.format( key, sep, quote(str(self.attr[key]), ' ()/') ) for key in sorted(self.attr) ) return "{0}\t{1}\n".format(values, attr_column)
[docs] def to_dict(self, exclude_attr=None): """ .. versionadded:: 0.3.1 Return a dictionary representation of the Annotation. Arguments: exclude_attr (str,list): attributes to exclude from the dictionary, can be either a single attribute (string) or a list of strings Returns: dict: dictionary with the annotation """ var_names = ( 'seq_id', 'source', 'feat_type', 'start', 'end', 'score', 'strand', 'phase' ) dictionary = {} for var_name in var_names: dictionary[var_name] = getattr(self, var_name) dictionary.update(self.attr) if exclude_attr is not None: if isinstance(exclude_attr, str): exclude_attr = [exclude_attr] for attr in exclude_attr: del dictionary[attr] return dictionary
[docs] def to_json(self): """ .. versionadded:: 0.2.1 .. versionchanged:: 0.3.1 now :meth:`Annotation.to_dict` is used Returns a json representation of the Annotation """ return json.dumps(self.to_dict(), separators=(',', ':'))
[docs] def to_mongodb(self, lineage_func=None, indent=None, raw=False): """ .. versionadded:: 0.2.1 .. versionchanged:: 0.2.2 added handling of *counts_* and *fpkms_* .. versionchanged:: 0.2.6 added *indent* parameter .. versionchanged:: 0.3.4 added *raw* Returns a MongoDB document that represent the Annotation. Arguments: lineage (func): function used to populate the lineage key, returns a list of taxon_id indent (int): the amount of indent to put in the record, None (the default) is for the most compact - one line for the record raw (bool): if True, the method returns a string, which is the json dump, if False, the value returned is the dictionary Returns: str or dict: the MongoDB document, with Annotation.uid as _id, as a string if *raw* is True, a dictionary if it is False """ # OrderedDict is necessary to keep the order of the keys dictionary = OrderedDict() # _id must be the first element dictionary['_id'] = self.uid var_names = ( 'seq_id', 'source', 'feat_type', 'start', 'end', 'score', 'strand', 'phase', 'gene_id', 'taxon_id', 'bitscore', 'exp_nonsyn', 'exp_syn', 'length', 'dbq', 'coverage', 'uid' ) for var_name in var_names: try: dictionary[var_name] = getattr(self, var_name) except AttributeNotFound: pass ec_ids = self.get_ec() mappings = self.get_mappings() # if one at least has values if ec_ids: mappings['ec'] = list(ec_ids) if lineage_func is not None: dictionary['lineage'] = lineage_func(self.taxon_id) dictionary['map'] = mappings counts = self.counts if counts: dictionary['counts'] = counts fpkms = self.fpkms if fpkms: dictionary['fpkms'] = fpkms # the rest of the dictionary should be put, excluding special keys: # uid is used as _id in the document # EC is put as a array, as is any mapping like map_KO dictionary.update( dict( (key, value) for key, value in viewitems(self.attr) if (key not in var_names) and (key not in ('uid', 'EC')) and (not key.startswith('map_')) and (not key.startswith('counts_')) and (not key.startswith('fpkms_')) ) ) if raw: return dictionary return json.dumps(dictionary, indent=indent, separators=(',', ':'))
[docs] def to_file(self, file_handle): """ Writes the GFF annotation to *file_handle* """ file_handle.write(self.to_gff().encode('ascii'))
[docs] def to_gtf(self, gene_id_attr='uid', sep=' '): """ .. versionadded:: 0.1.15 .. versionchanged:: 0.1.16 added *gene_id_attr* parameter .. versionchanged:: 0.2.2 added *sep* argument, default to a space, now Simple conversion to a valid GTF. gene_id and transcript_id are set to *uid* or the attribute specified using the *gene_id_attr* parameter. It's written to be used with *SNPDat*. """ var_names = ( 'seq_id', 'source', 'feat_type', 'start', 'end', 'score', 'strand', 'phase' ) values = '\t'.join( str(getattr(self, var_name)) for var_name in var_names ) # Keys that needs to be at the start of the attributes gtf_attr = ['gene_id', 'transcript_id'] attr_keys = sorted(self.attr.keys()) # eliminate gene_id (always present in new ones) try: attr_keys.remove('gene_id') except ValueError: pass # transcript_id don't always be there try: attr_keys.remove('transcript_id') except ValueError: pass attr_values = [self.get_attr(gene_id_attr)] * 2 + [ self.attr[attr_key] for attr_key in attr_keys ] attr_keys = gtf_attr + attr_keys attr_column = '; '.join( '{0}{1}"{2}"'.format( key, sep, quote(value, ' ()/') ) for key, value in zip(attr_keys, attr_values) ) return "{0}\t{1}\n".format(values, attr_column)
@property def sample_coverage(self): """ .. versionadded:: 0.1.13 Returns a dictionary with the coverage for each sample, the returned dictionary has the sample id (stripped of the *_cov*) suffix and as values the coverage (converted via :func:`int`). :return dict: dictionary with the samples' coverage """ attributes = self.attr return dict( (attribute.replace('_cov', ''), float(value)) for attribute, value in viewitems(attributes) if attribute.endswith('_cov') )
[docs] def get_number_of_samples(self, min_cov=MIN_COV): """ .. versionadded:: 0.1.13 Returns the number of sample that have at least a minimum coverage of `min_cov`. :param int min_cov: minimum coverage :return int: number of samples passing the filter :raise AttributeNotFound: if no sample coverage attribute is found """ coverage = self.sample_coverage if not coverage: raise AttributeNotFound( 'No coverage attribute found (ending in "_cov")' ) return sum( 1 for sample, coverage in viewitems(coverage) if coverage >= min_cov )
[docs] def get_attr(self, attr, conv=str): """ .. versionchanged:: 0.3.4 any GFF attribute can be returned .. versionchanged:: 0.3.3 added *seq_id* as special attribute, in addition do *length* .. versionadded:: 0.1.13 Generic method to get an attribute and convert it to a specific datatype. The order for the lookup is: * length * self.attr (dictionary) * getattr(self) of the first 8 columns of a GFF (seq_id, source, ...) """ if attr == 'length': return len(self) value = self.attr.get(attr, None) if value is not None: return conv(value) if attr in ('seq_id', 'source', 'feat_type', 'start', 'end', 'score', 'strand', 'phase'): value = getattr(self, attr, None) if value is not None: return conv(value) raise AttributeNotFound('No {0} attribute found'.format(attr))
[docs] def set_attr(self, attr, value): """ .. versionadded:: 0.1.13 .. versionchanged:: 0.4.4 a standard attribute can now be set Generic method to set an attribute """ if attr in ('seq_id', 'source', 'feat_type', 'start', 'end', 'score', 'strand', 'phase'): setattr(self, attr, value) return None self.attr[attr] = value
[docs] def has_attr(self, attr): """ .. versionadded:: 0.4.4 Tests if an attributes is present in the Annotation Arguments: attr (str): attribute to test Returns: bool: True if the attributes is present """ if attr in ('seq_id', 'source', 'feat_type', 'start', 'end', 'score', 'strand', 'phase'): return True return attr in self.attr
[docs] def del_attr(self, attr): """ .. versionadded:: 0.4.4 Removes attributes from the Annotation """ if attr in ('seq_id', 'source', 'feat_type', 'start', 'end', 'score', 'strand', 'phase'): return None try: del self.attr[attr] except KeyError: pass
@property def coverage(self): """ .. versionadded:: 0.1.13 Return the total coverage for the annotation :return float: coverage :raise AttributeNotFound: if no coverage attribute is found """ return self.get_attr('cov', float) @property def exp_syn(self): """ .. versionadded:: 0.1.13 Returns the expected number of synonymous changes """ return self.get_attr('exp_syn', int) @property def exp_nonsyn(self): """ .. versionadded:: 0.1.13 Returns the expected number of non-synonymous changes """ return self.get_attr('exp_nonsyn', int)
[docs] def get_nuc_seq(self, seq, reverse=False, snp=None): """ .. versionadded:: 0.1.13 .. versionchanged:: 0.1.16 added *snp* parameter Returns the nucleotidic sequence that the annotation covers. if the annotation's strand is *-*, and *reverse* is True, the reverse complement is returned. Arguments: seq (seq): chromosome/contig sequence reverse (bool): if True and the strand is '-', a reverse complement is returned snp (tuple): first element is the position of the SNP relative to the Annotation and the second element is the change Returns: str: nucleotide sequence with requested transformations """ ann_seq = seq[self.start - 1:self.end] if snp is not None: ann_seq = seq_utils.get_variant_sequence(ann_seq, snp) if (self.strand == '-') and reverse: ann_seq = seq_utils.reverse_complement(ann_seq) return ann_seq
[docs] def get_aa_seq(self, seq, start=0, tbl=None, snp=None): """ .. versionadded:: 0.1.16 Returns a translated aminoacid sequence of the annotation. The snp parameter is passed to :meth:`Annotation.get_nuc_seq` Arguments: seq (seq): chromosome/contig sequence start (int): position (0-based) from where the correct occurs (frame). If None, the phase attribute is used tbl (dict): dictionary with the translation for each codon, passed to :func:`mgkit.utils.sequence.translate_sequence` snp (tuple): first element is the position of the SNP and the second element is the change Returns: str: aminoacid sequence """ if start is None: start = self.phase nuc_seq = self.get_nuc_seq(seq, reverse=True, snp=snp) return seq_utils.translate_sequence( nuc_seq, start=start, tbl=None, reverse=False )
[docs] def add_gc_content(self, seq): """ Adds GC content information for an annotation. The formula is: .. math:: :label: gc_content_gff \\frac {(G + C)}{(G + C + A + T)} Modifies the instances of the annotation. gc_ratio will be added to its attributes. Arguments: seq (str): nucleotide sequence referred in the GFF """ ann_seq = self.get_nuc_seq( seq, reverse=True if self.strand == '-' else False ) at_sum = (ann_seq.count('A') + ann_seq.count('T')) gc_sum = (ann_seq.count('G') + ann_seq.count('C')) gc_cont = gc_sum / (gc_sum + at_sum) self.set_attr('gc_cont', gc_cont)
[docs] def add_gc_ratio(self, seq): """ Adds GC content information for an annotation. The formula is: .. math:: :label: gc_ratio_gff \\frac {(A + T)}{(G + C)} Modifies the instances of the annotation. gc_ratio will be added to its attributes. Arguments: seq (str): nucleotide sequence referred in the GFF """ ann_seq = self.get_nuc_seq( seq, reverse=True if self.strand == '-' else False ) at_sum = (ann_seq.count('A') + ann_seq.count('T')) gc_sum = (ann_seq.count('G') + ann_seq.count('C')) gc_ratio = at_sum / gc_sum self.set_attr('gc_ratio', gc_ratio)
[docs] def is_syn(self, seq, pos, change, tbl=None, abs_pos=True, start=0, strict=True): """ .. versionadded:: 0.1.16 .. versionchanged:: 0.4.4 added *strict* parameter Return if a SNP is synonymous or non-synonymous. Arguments: seq (seq): reference sequence of the annotation pos (int): position of the SNP on the reference (1-based index) change (str): nucleotidic change tbl (dict): dictionary with the translation table. Defaults to the universal genetic code abs_pos (bool): if True the *pos* is referred to the reference and not a position relative to the annotation start (int or None): phase to be used to get the start position of the codon. if None, the Annotation phase will be used strict (bool): if a variant codon is not found, a KeyError is raised, otherwise *False* is returned Returns: bool: True if the SNP is synonymous, false if it's non-synonymous. Behaviour in case of variant codons not found in the translation table changes based on *strict* Raises: KeyError: if the variant codon is not found and *strict* is True """ if abs_pos: rel_pos = self.get_relative_pos(pos) else: rel_pos = pos if start is None: start = self.phase if tbl is None: tbl = UNIVERSAL # codon number in the sequence codon_index = (rel_pos - start - 1) // 3 # the position to slice the seq to get a codon (0-based). It takes into # account the phase (start) and the codon index seq_start = (self.start + start + (codon_index * 3) - 1) # position in the codon using the relative position and the phase/frame # the module will give 1, 2 or 0. -1 will shift the position correctly codon_change = ((rel_pos - start) % 3) - 1 codon = seq[seq_start:seq_start+3] var_codon = list(codon) var_codon[codon_change] = change var_codon = ''.join(var_codon) if self.strand == '-': codon = seq_utils.reverse_complement(codon) var_codon = seq_utils.reverse_complement(var_codon) try: return UNIVERSAL[codon] == UNIVERSAL[var_codon] except KeyError: LOG.warning("""Annotation %s has an unrecognised codon: reference %s, variant %s""", self.uid, codon, var_codon) if strict: raise KeyError("Variant codon not found %s", var_codon) else: return False
[docs]def from_glimmer3(header, line, feat_type='CDS'): """ .. versionadded:: 0.1.12 Parses the line of a GLIMMER3 ouput and returns an instance of a GFF annotation. Arguments: header (str): the seq_id to which the ORF belongs line (str): the prediction line for the orf feat_type (str): the feature type to use Returns: Annotation: instance of annotation Example: Assuming a GLIMMER3 output like this:: >sequence0001 orf00001 66 611 +3 6.08 The code used is: >>> header = 'sequence0001' >>> line = 'orf00001 66 611 +3 6.08' >>> from_glimmer3(header, line) """ if isinstance(line, bytes): line = line.decode('ascii') orf_id, start, end, frame, score = line.split() start = int(start) end = int(end) if start > end: start, end = end, start annotation = Annotation( seq_id=header, source='GLIMMER3', feat_type=feat_type, start=start, end=end, score=float(score), strand=frame[0], phase=int(frame[1]) - 1, frame=frame, glimmer_score=float(score), orf_id=orf_id ) return annotation
[docs]class DuplicateKeyError(Exception): """ .. versionadded:: 0.1.12 Raised if a GFF annotation contains duplicate keys """ pass
[docs]def from_gff(line, strict=True, encoding='ascii'): """ .. versionadded:: 0.1.12 .. versionchanged:: 0.2.6 added *strict* parameter .. versionchanged:: 0.4.0 added *encoding* parameter Parse GFF line and returns an :class:`Annotation` instance Arguments: line (str): GFF line strict (bool): if True duplicate keys raise an exception Returns: Annotation: instance of :class:`Annotation` for the line Raises: DuplicateKeyError: if the attribute column has duplicate keys """ if isinstance(line, bytes): line = line.decode(encoding) line = line.rstrip().split('\t') # in case the last column (attributes) is empty if len(line) < 9: values = line # bug in which the phase was not written if len(line[-1]) > 1: line.insert(-1, 0) else: values = line[:-1] var_names = ( 'seq_id', 'source', 'feat_type', 'start', 'end', 'score', 'strand', 'phase' ) # the phase sometimes can be set as unknown, using '-'. We prefer using 0 var_types = (str, str, str, int, int, float, str, lambda x: 0 if x == '' else int(x)) attr = {} for var, value, vtype in zip(var_names, values, var_types): try: attr[var] = vtype(value) except ValueError: attr[var] = value # in case the last column (attributes) is empty if len(line) < 9: return Annotation(**attr) for pair in line[-1].split(';'): try: # by default the key,value separator '=' is assumed to be used var, value = pair.strip().split('=', 1) except ValueError: # in case it doesn't work, it is assumed to be a space if ' ' in pair.strip(): var, value = pair.strip().split(' ', 1) else: # case in which there's an attribute but no value, like a bool var = pair.strip() value = None if (var in attr): if strict: raise DuplicateKeyError("Duplicate attribute: {0}".format(var)) else: old_var = var var = '_dup_' + var LOG.debug("Duplicate attribute: %s -> %s", old_var, var) # skips possible key/values generated by the line ending with a ';' if not var: continue if value is not None: value = unquote(value.replace('"', '')) attr[var] = value return Annotation(**attr)
[docs]def from_sequence(name, seq, feat_type='SEQUENCE', **kwd): """ .. versionadded:: 0.1.12 Returns an instance of :class:`Annotation` for the full length of a sequence Arguments: name (str): name of the sequence seq (str): sequence, to get the length of the annotation Keyword Args: feat_type (str): feature type in the GFF **kwd: any additional column Returns: Annotation: instance of :class:`Annotation` """ annotation = Annotation( seq_id=name, source='SEQUENCE', feat_type=feat_type, start=1, end=len(seq), score=0.0, strand='+', phase=0, sequence=name, **kwd ) return annotation
[docs]def from_aa_blast_frag(hit, parent_ann, aa_seqs): frag_id, frame = hit[0].split('-') strand = '+' if frame.startswith('f') else '-' frame = int(frame[1]) identity = hit[2] bitscore = hit[-1] start = hit[3] end = hit[4] if strand == '-': start, end = seq_utils.reverse_aa_coord( start, end, len(aa_seqs[hit[0]]) ) start, end = seq_utils.convert_aa_to_nuc_coord(start, end, frame) annotation = Annotation( seq_id=parent_ann.seq_id, source='BLAST', feat_type='CDS', start=start + parent_ann.start - 1, end=end + parent_ann.start - 1, score=bitscore, strand=strand, phase=frame, db='UNIPROT', gene_id=hit[1], identity=identity, bitscore=bitscore, ID=frag_id ) return annotation
[docs]def from_nuc_blast_frag(hit, parent_ann, db='NCBI-NT'): frag_id = hit[0] strand = '+' identity = hit[2] bitscore = hit[-1] start = hit[3] end = hit[4] annotation = Annotation( seq_id=parent_ann.seq_id, source='BLAST', feat_type='CDS', start=start + parent_ann.start - 1, end=end + parent_ann.start - 1, score=bitscore, strand=strand, phase=0, db=db, gene_id=hit[1], identity=identity, bitscore=bitscore, ID=frag_id ) return annotation
[docs]def annotate_sequence(name, seq, window=None): length = len(seq) if window is None: window = length for index in range(1, length, window): annotation = Annotation.from_sequence(name, seq) annotation.start = index annotation.end = index + window - 1 if annotation.end > length: annotation.end = length yield annotation
[docs]def from_nuc_blast(hit, db, feat_type='CDS', seq_len=None, to_nuc=False, **kwd): """ .. versionadded:: 0.1.12 .. versionchanged:: 0.1.16 added *to_nuc* parameter .. versionchanged:: 0.2.3 removed *to_nuc*, the hit can include the subject end/start and evalue Returns an instance of :class:`Annotation` Arguments: hit (tuple): a BLAST hit, from :func:`mgkit.io.blast.parse_blast_tab` db (str): db used with BLAST Keyword Args: feat_type (str): feature type in the GFF seq_len (int): sequence length, if supplied, the phase for strand '-' can be assigned, otherwise is assigned a 0 and the sequence coverage are added **kwd: any additional column Returns: Annotation: instance of :class:`Annotation` """ seq_id = hit[0] gene_id = hit[1] strand = '+' identity = hit[2] bitscore = hit[-1] start = hit[3] end = hit[4] phase = 0 if start > end: start, end = end, start strand = '-' if seq_len is not None: phase = (seq_len - end) % 3 if seq_len is not None: query_coverage = (end - start + 1.) / seq_len * 100 kwd['query_coverage'] = query_coverage if strand == '+': phase = (start - 1) % 3 annotation = Annotation( seq_id=seq_id, source='BLAST', feat_type=feat_type, start=start, end=end, score=bitscore, strand=strand, phase=phase, db=db, gene_id=gene_id, identity=identity, bitscore=bitscore, frame="{}{}".format('f' if strand == '+' else 'r', phase), **kwd ) # the hit includes subject end/start and evalue, as per new version of # mgkit.io.blast.parse_uniprot_blast if len(hit) == 9: annotation.attr['evalue'] = hit[-2] annotation.attr['subject_end'] = hit[-3] annotation.attr['subject_start'] = hit[-4] return annotation
[docs]def from_json(line): """ .. versionadded:: 0.2.1 Returns an Annotation from a json representation """ return Annotation(**json.loads(line))
[docs]def from_hmmer(line, aa_seqs, feat_type='gene', source='HMMER', db='CUSTOM', custom_profiles=True, noframe=False): """ .. versionadded:: 0.1.15 first implementation to move old scripts to new GFF specs .. versionchanged:: 0.2.1 removed compatibility with old scripts .. versionchanged:: 0.2.2 taxon_id and taxon_name are not saved for non-custom profiles .. versionchanged:: 0.3.1 added support for non mgkit-translated sequences (*noframe*) Parse HMMER results (one line), it won't parse commented lines (starting with *#*) Arguments: line (str): HMMER domain table line aa_seqs (dict): dictionary with amino-acid sequences (name->seq), used to get the correct nucleotide positions feat_type (str): string to be used in the 'feature type' column source (str): string to be used in the 'source' column custom_profiles (bool): if True, the profile name contains gene, taxonomy and reviewed information in the form KOID_TAXONID_TAXON-NAME(-nr) noframe (bool): if True, the sequence is assumed to be in frame f0 Returns: A :class:`Annotation` instance .. note:: if `custom_profiles` is False, gene_id, taxon_id and taxon_name will be equal to the profile name """ if isinstance(line, bytes): line = line.decode('ascii') line = line.split() if noframe: # no information on the frame is provided (already a protein, so f0) frame = 'f0' contig = line[0] else: contig, frame = line[0].rsplit('-', 1) t_from = int(line[17]) t_to = int(line[18]) # first get coordinate if sequence is reversed if frame.startswith('r'): seq_len = len(aa_seqs[line[0]]) t_from, t_to = seq_utils.reverse_aa_coord(t_from, t_to, seq_len) # necessary only if frame information available if not noframe: # converts in nucleotide coordinates t_from, t_to = seq_utils.convert_aa_to_nuc_coord( t_from, t_to, frame=int(frame[-1]) ) # maintains the aa coordinates aa_from = int(line[17]) aa_to = int(line[18]) profile_name = line[3] score = float(line[6]) if custom_profiles: # KOID_TAXONID_TAXON-NAME(-nr) reviewed = 'False' if profile_name.endswith('-nr') else 'True' gene_id, taxon_id, taxon_name = profile_name.split('_') else: gene_id = profile_name taxon_id = taxon_name = None annotation = Annotation( seq_id=contig, source=source, feat_type=feat_type, start=t_from, end=t_to, score=score, strand='-' if frame.startswith('r') else '+', phase=int(frame[1]), db=db, gene_id=gene_id, taxon_id=taxon_id, bitscore=float(line[7]), # custom for HMMER profiles aa_from=aa_from, aa_to=aa_to, # stores the aa sequence aa_seq=aa_seqs[line[0]][aa_from - 1:aa_to], # evalue evalue=score, # maintains HMMER profile information: # profile name name=profile_name, # both strand/phase (e.g r2) frame=frame, # old version of uid # ko_idx=ko_idx, # used in other old profiles, where the taxon name was used instead # of a taxon ID taxon_name=taxon_name ) try: annotation.attr['reviewed'] = reviewed except UnboundLocalError: pass # removes the None values from non-custom profiles if taxon_id is None: del annotation.attr['taxon_id'] if taxon_name is None: del annotation.attr['taxon_name'] return annotation
[docs]def parse_gff(file_handle, gff_type=from_gff, strict=False, encoding='ascii', filter_func=None): """ .. versionchanged:: 0.5.7 added *filter_func* .. versionchanged:: 0.4.0 In some cases ASCII decoding is not enough, so it is parametrised now .. versionchanged:: 0.3.4 added decoding from binary for compatibility with Python3 .. versionchanged:: 0.2.6 added *strict* parameter .. versionchanged:: 0.2.3 correctly handling of GFF with comments of appended sequences .. versionchanged:: 0.1.12 added *gff_type* parameter Parse a GFF file and returns generator of :class:`GFFKegg` instances Accepts a file handle or a string with the file name Arguments: file_handle (str, file): file name or file handle to read from gff_type (class): class/function used to parse a GFF annotation strict (bool): if True duplicate keys raise an exception encoding (str): encoding of the file, if ascii fails, use utf8 file_func (func): a function used to filter, accepts an annotation as argument and returns True if the annotation is to be kept Yields: Annotation: an iterator of :class:`Annotation` instances """ file_handle = mgkit.io.open_file(file_handle, 'rb') LOG.info( "Loading GFF from file (%s)", getattr(file_handle, 'name', repr(file_handle)) ) index = 0 for index, line in enumerate(file_handle): try: line = line.decode(encoding) except UnicodeError: raise UnicodeError("Impossible to decode line to {}: {}".format(encoding, line)) # the first is for GFF with comments and the second for # GFF with the fasta file attached if line.startswith('#'): continue if line.startswith('>'): break annotation = gff_type(line, strict=strict) if filter_func is not None: if not filter_func(annotation): continue yield annotation LOG.info( "Read %d line from file (%s)", index + 1, getattr(file_handle, 'name', repr(file_handle)) )
[docs]def diff_gff(files, key_func=None): """ .. versionadded:: 0.1.12 Returns a simple diff made between a list of gff files. The annotations are grouped using *key_func*, so it depends on it to find similar annotations. Arguments: files (iterable): an iterable of file handles, pointing to GFF files key_func (func): function used to group annotations, defaults to this key: *(x.seq_id, x.strand, x.start, x.end, x.gene_id, x.bitscore)* Returns: dict: the returned dictionary keys are determined by key_func and as values lists. The lists elements are tuple whose first element is the index of the file, relative to *files* and the second element is the line number in which the annotation is. Can be used with the :mod:`linecache` module. """ if isinstance(files, str) or len(files) == 1: return if key_func is None: def key_func(x): return (x.seq_id, x.strand, x.start, x.end, x.gene_id, x.bitscore) gff_diff = {} for index, file_handle in enumerate(files): for lineno, annotation in enumerate(parse_gff(file_handle)): key = key_func(annotation) try: gff_diff[key].append((index, lineno)) except KeyError: gff_diff[key] = [(index, lineno)] return gff_diff
[docs]def annotation_elongation(ann1, annotations): """ .. versionadded:: 0.1.12 Given an :class:`Annotation` instance and a list of the instances of the same class, returns the longest overlapping range that can be found and the annotations that are included in it. .. warning:: annotations are not checked for seq_id and strand Arguments: ann1 (Annotation): annotation to elongate annotations (iterable): iterable of :class:`Annotation` instances Returns: tuple: the first element is the longest range found, while the the second element is a set with the annotations used """ used = set([ann1]) union = (ann1.start, ann1.end) for ann2 in annotations: new_union = union_range(union[0], union[1], ann2.start, ann2.end) if new_union is not None: used.add(ann2) union = new_union return union, used
[docs]def elongate_annotations(annotations): """ .. versionadded:: 0.1.12 Given an iterable of :class:`Annotation` instances, tries to find the all possible longest ranges and returns them. .. warning:: annotations are not checked for seq_id and strand Arguments: annotations (iterable): iterable of :class:`Annotation` instances Returns: set: set with the all ranges found """ annotations = sorted(annotations, key=lambda x: x.start) ranges = set() while len(annotations) > 0: ann1 = annotations.pop(0) union, used = annotation_elongation(ann1, annotations) if union is None: ranges.add((ann1.start, ann1.end)) else: annotations = sorted(set(annotations) - used, key=lambda x: x.start) ranges.add(union) return ranges
[docs]def annotation_coverage(annotations, seqs, strand=True): """ .. versionadded:: 0.1.12 Given a list of annotations and a dictionary where the keys are the sequence names referred in the annotations and the values are the sequences themselves, returns a number which indicated how much the sequence length is "covered" in annotations. If *strand* is True the coverage is strand specific. Arguments: annotations (iterable): iterable of :class:`Annotation` instances seqs (dict): dictionary in which the keys are the sequence names and the values are the sequences strand (bool): if True, the values are strand specific (the annotations) are grouped by (seq_id, strand) instead of seq_id Yields: tuple: the first element is the key, (seq_id, strand) if *strand* is True or seq_id if *strand* is False, and the coverage is the second value. """ if strand: def key_func(x): return (x.seq_id, x.strand) else: def key_func(x): return x.seq_id annotations = group_annotations( annotations, key_func=key_func ) for key, key_ann in viewitems(annotations): if isinstance(key, str): seq_len = len(seqs[key]) else: seq_len = len(seqs[key[0]]) covered = ranges_length(elongate_annotations(key_ann)) yield key, covered / seq_len * 100
[docs]def annotation_coverage_sorted(annotations, seqs, strand=True): """ .. versionadded:: 0.3.1 Given a list of annotations and a dictionary where the keys are the sequence names referred in the annotations and the values are the sequences themselves, returns a number which indicated how much the sequence length is "covered" in annotations. If *strand* is True the coverage is strand specific. .. note:: It differs from :func:`annotation_coverage` because it assumes the annotations are correctly sorted and in the values yielded Arguments: annotations (iterable): iterable of :class:`Annotation` instances seqs (dict): dictionary in which the keys are the sequence names and the values are the sequences strand (bool): if True, the values are strand specific (the annotations) are grouped by (seq_id, strand) instead of seq_id Yields: tuple: the first element is the seq_id, the second the strand (if strand is True, else it's set to *None*), and the third element is the coverage. """ if strand: def key_func(x): return (x.seq_id, x.strand) else: def key_func(x): return x.seq_id annotations = group_annotations_sorted( annotations, key_func=key_func ) for ann in annotations: seq_id = ann[0].seq_id if strand: ann_strand = ann[0].strand else: ann_strand = None seq_len = len(seqs[seq_id]) covered = ranges_length(elongate_annotations(ann)) yield seq_id, ann_strand, covered / seq_len * 100
[docs]def group_annotations(annotations, key_func=lambda x: (x.seq_id, x.strand)): """ .. versionadded:: 0.1.12 Group :class:`Annotation` instances in a dictionary by using a key function that returns the key to be used in the dictionary. Arguments: annotations (iterable): iterable with :class:`Annotation` instances key_func (func): function used to extract the key used in the dictionary, defaults to a function that returns (ann.seq_id, ann.strand) Returns: dict: dictionary whose keys are returned by *key_func* and the values are lists of annotations Example: >>> ann = [Annotation(seq_id='seq1', strand='+', start=10, end=15), ... Annotation(seq_id='seq1', strand='+', start=1, end=5), ... Annotation(seq_id='seq1', strand='-', start=30, end=100)] >>> group_annotations(ann) {('seq1', '+'): [seq1(+):10-15, seq1(+):1-5], ('seq1', '-'): [seq1(-):30-100]} """ grouped = {} for annotation in annotations: key = key_func(annotation) try: grouped[key].append(annotation) except KeyError: grouped[key] = [annotation] return grouped
[docs]def group_annotations_sorted(annotations, key_func=lambda x: (x.seq_id, x.strand)): """ .. versionadded:: 0.1.13 Group :class:`Annotation` instances by using a key function that returns a key. Assumes that the annotations are already sorted to return an iterator and save memory. One way to sort them is using: `sort -s -k 1,1 -k 7,7` on the file. Arguments: annotations (iterable): iterable with :class:`Annotation` instances key_func (func): function used to extract the key used in the dictionary, defaults to a function that returns (ann.seq_id, ann.strand) Yields: list: a list of the grouped annotations by *key_func* values """ curr_key = '' curr_ann = [] for annotation in annotations: new_key = key_func(annotation) if curr_key == new_key: curr_ann.append(annotation) else: if curr_key == '': curr_ann.append(annotation) curr_key = new_key else: yield curr_ann curr_key = new_key curr_ann = [annotation] else: yield curr_ann
[docs]def extract_nuc_seqs(annotations, seqs, name_func=lambda x: x.uid, reverse=False): """ .. versionadded:: 0.1.13 Extract the nucleotidic sequences from a list of annotations. Internally uses the method :meth:`Annotation.get_nuc_seq`. Arguments: annotations (iterable): iterable of :class:`Annotation` instances seqs (dict): dictionary with the sequences referenced in the annotations name_func (func): function used to extract the sequence name to be used, defaults to the uid of the annotation reverse (bool): if True the annotations on the *-* strand are reverse complemented Yields: tuple: tuple whose first element is the sequence name and the second is the sequence to which the annotation refers. """ for annotation in annotations: name = name_func(annotation) seq = annotation.get_nuc_seq(seqs[annotation.seq_id], reverse=reverse) yield name, seq
[docs]def group_annotations_by_ancestor(annotations, ancestors, taxonomy): """ .. versionadded:: 0.1.13 Group annotations by the ancestors provided. Arguments: annotations (iterable): annotations to group ancestors (iterable): list of ancestors accepted taxonomy: taxonomy class Returns: dict: grouped annotations """ ann_dict = dict((ancestor, []) for ancestor in ancestors) unknown = [] for annotation in annotations: anc_found = False for ancestor, anc_ids in viewitems(ancestors): if taxonomy.is_ancestor(annotation.taxon_id, anc_ids): ann_dict[ancestor].append(annotation) anc_found = True break if not anc_found: unknown.append(annotation) return ann_dict, unknown
[docs]def split_gff_file(file_handle, name_mask, num_files=2, encoding='ascii'): """ .. versionadded:: 0.1.14 .. versionchanged:: 0.2.6 now accept a file object as sole input .. versionchanged:: 0.4.0 added *encoding* parameter Splits a GFF, or a list of them, into a number of files. It is assured that annotations for the same sequence are kept in the same file, which is useful for cases like filtering, even when the annotations are from different GFF files. Internally, a structure is kept to check if a sequence ID is already been stored to a file, in which case the annotation is written to that file, otherwise a random file handles (among the open ones) is chosen. Arguments: file_handle (str, list): a single or list of file handles (or file names), from which the GFF annotations are read name_mask (str): a string used as template for the output file names on which the function applies :func:`string.format` num_files (int): the number of files to split the records Example: >>> import glob >>> files = glob.glob('*.gff') >>> name_mask = 'split-file-{0}.gff' >>> split_gff_file(files, name_mask, 5) """ if not isinstance(file_handle, IOBase): if isinstance(file_handle, str): file_handle = [file_handle] file_handle = itertools.chain( *(mgkit.io.open_file(x, 'rb') for x in file_handle) ) out_handles = [ mgkit.io.open_file(name_mask.format(filen), 'wb') for filen in range(num_files) ] seq_ids = {} for line in file_handle: line = line.decode(encoding) if line.startswith('#'): continue if line.startswith('>'): break seq_id = line.split('\t')[0] try: out_handle = out_handles[seq_ids[seq_id]] except KeyError: new_index = random.randint(0, num_files - 1) seq_ids[seq_id] = new_index out_handle = out_handles[new_index] out_handle.write(line.encode('ascii'))
[docs]def load_gff_base_info(files, taxonomy=None, exclude_ids=None, include_taxa=None, encoding='ascii'): """ This function is useful if the number of annotations in a GFF is high or there are memory constraints on the system. It returns a dictionary that can be used with functions like :func:`mgkit.counts.func.load_sample_counts`. Arguments: files (iterable, str): file name or list of paths of GFF files taxonomy: taxonomy pickle file, needed if include_taxa is not None exclude_ids (set, list): a list of gene_id to exclude from the dictionary include_taxa (int, iterable): a taxon_id or list thereof to be passed to :meth:`mgkit.taxon.taxonomy.is_ancestor`, so only the taxa that have the those taxon_id(s) as ancestor(s) are kept encoding (str): passed to :func:`parse_gff` Returns: dict: dictionary where the key is :attr:`Annotation.uid` and the value is a tuple (:attr:`Annotation.gene_id`, :attr:`Annotation.taxon_id`) """ if isinstance(files, str): files = [files] infos = {} for fname in files: for annotation in parse_gff(fname, encoding=encoding): # no information on taxa - exclude if annotation.taxon_id is None: continue # to exclude ribosomial genes or any other kind if exclude_ids is not None: if annotation.gene_id in exclude_ids: continue if (include_taxa is not None) and (taxonomy is not None): if not taxonomy.is_ancestor(annotation.taxon_id, include_taxa): continue infos[annotation.uid] = (annotation.gene_id, annotation.taxon_id) return infos
[docs]def load_gff_mappings(files, map_db, taxonomy=None, exclude_ids=None, include_taxa=None, encoding='ascii'): """ This function is useful if the number of annotations in a GFF is high or there are memory constraints on the system. It returns a dictionary that can be used with functions like :func:`mgkit.counts.func.load_sample_counts`. Arguments: files (iterable, str): file name or list of paths of GFF files map_db (str): any kind mapping in the GFF, as passed to :meth:`Annotation.get_mapping` taxonomy: taxonomy pickle file, needed if include_taxa is not None exclude_ids (set, list): a list of gene_id to exclude from the dictionary include_taxa (int, iterable): a taxon_id or list thereof to be passed to :meth:`mgkit.taxon.taxonomy.is_ancestor`, so only the taxa that have the those taxon_id(s) as ancestor(s) are kept encoding (str): passed to :func:`parse_gff` Returns: dict: dictionary where the key is :attr:`Annotation.gene_id` and the value is a list of mappings, as returned by :meth:`Annotation.get_mapping` """ infos = {} for fname in files: for annotation in parse_gff(fname, encoding=encoding): # skips genes that are already in the mapping if annotation.gene_id in infos: continue # exclude genes with no taxonomic information if annotation.taxon_id is None: continue if exclude_ids is not None: if annotation.gene_id in exclude_ids: continue # skips non bacterial/achaeal genes if (include_taxa is not None) and (taxonomy is not None): if not taxonomy.is_ancestor(annotation.taxon_id, include_taxa): continue infos[annotation.gene_id] = annotation.get_mapping(map_db) return infos
[docs]def parse_gff_files(files, strict=True): """ .. versionadded:: 0.1.15 .. versionchanged:: 0.2.6 added *strict* parameter Function that returns an iterator of annotations from multiple GFF files. Arguments: files (iterable, str): iterable of file names of GFF files, or a single file name strict (bool): if True duplicate keys raise an exception Yields: :class:`Annotation`: iterator of annotations """ if isinstance(files, str): files = [files] return itertools.chain(*(parse_gff(file_name, strict=strict) for file_name in files))
[docs]def get_annotation_map(annotations, key_func, value_func): """ .. versionadded:: 0.1.15 Applies two functions to an iterable of annotations with an iterator returned with the applied functions. Useful to build a dictionary Arguments: annotations (iterable): iterable of annotations key_func (func): function that accept an annotation as argument and returns one value, the first of the returned tuple value_func (func): function that accept an annotation as argument and returns one value, the second of the returned tuple Yields: tuple: a tuple where the first value is the result of *key_func* on the passed annotation and the second is the value returned by *value_func* on the same annotation """ for annotation in annotations: yield key_func(annotation), value_func(annotation)
[docs]def convert_gff_to_gtf(file_in, file_out, gene_id_attr='uid'): """ .. versionadded:: 0.1.16 Function that uses :meth:`Annotation.to_gtf` to convert a GFF into GTF. Arguments: file_in (str, file): either file name or file handle of a GFF file file_out (str): file name to which write the converted annotations """ LOG.info("Writing GTF file to %s", file_out) file_out = open(file_out, 'w') for annotation in parse_gff(file_in): file_out.write(annotation.to_gtf())
[docs]def from_mongodb(record, lineage=True): """ .. versionadded:: 0.2.1 .. versionchanged:: 0.2.2 added handling of *counts_* and *fpkms_* .. versionchanged:: 0.2.6 better handling of missing attributes and added *lineage* parameter Returns a :class:`Annotation` instance from a MongoDB record (created) using :meth:`Annotation.to_mongodb`. The actual record returned by pymongo is a dictionary that is copied, manipulated and passed to the :meth:`Annotation.__init__`. Arguments: record (dict): a dictionary with the full record from a MongoDB query lineage (bool): indicates if the lineage information in the record should be kept in the annotation Returns: Annotation: instance of :class:`Annotation` object """ record = record.copy() record['uid'] del record['_id'] if 'map' in record: mappings = record['map'].copy() del record['map'] try: record['EC'] = ','.join(mappings['ec']) del mappings['ec'] except KeyError: pass try: for key in mappings: record['map_{}'.format(key.upper())] = ','.join(mappings[key]) except KeyError: pass try: counts = record['counts'].copy() del record['counts'] for key in counts: record['counts_{}'.format(key)] = counts[key] except KeyError: pass try: fpkms = record['fpkms'].copy() del record['fpkms'] for key in fpkms: record['fpkms_{}'.format(key)] = fpkms[key] except KeyError: pass if ('lineage' in record) and (not lineage): del record['lineage'] return Annotation(**record)
[docs]def from_prodigal_frag(main_gff, blast_gff, attr='ID', split_func=None): """ .. versionchanged:: 0.3.3 fixed a bug for the strand, also the code is tested .. versionadded:: 0.2.6 *experimental* Reads the GFF given in output by PRODIGAL and the resulting GFF from using BLAST (or other software) on the aa or nucleotide file output by PRODIGAL. It then integrates the two outputs, so to the PRODIGAL GFF is added the information from the the output of the gene prediction software used. Arguments: main_gff (file): GFF file from PRODIGAL blast_gff (file): GFF with the returned annotations attr (str): attribute in the PRODIGAL GFF that is used to identify an annotation split_func (func): function to rename the headers from the predicted sequences back to their parent sequence Yields: annotation: annotation for each *blast_gff* back translated """ if split_func is None: def split_func(x): return tuple(x.rsplit('_', 1)) prodigal_gff = {} for annotation in parse_gff(main_gff, strict=False): key = ( annotation.seq_id, split_func(annotation.get_attr(attr))[1] ) prodigal_gff[key] = ( annotation.start, annotation.end, annotation.strand, annotation.get_attr(attr) ) for annotation in parse_gff(blast_gff): key = split_func(annotation.seq_id) annotation.set_attr('prodigal_start', annotation.start) annotation.set_attr('prodigal_end', annotation.end) annotation.set_attr('prodigal_strand', annotation.strand) start, end, strand, p_id = prodigal_gff[key] annotation.seq_id = key[0] annotation.start = start annotation.end = end annotation.strand = strand annotation.set_attr('prodigal_ID', p_id) yield annotation