"""
This modules define classes and function related to manipulation of GFF/GTF
files.
"""
from __future__ import print_function, division
from builtins import object, zip, bytes, range
from io import IOBase
import random
import itertools
import logging
import uuid
import json
from future.utils import viewitems
from collections import OrderedDict
try:
from urllib import unquote, quote
except ImportError:
# Python3
from urllib.parse import unquote, quote
import mgkit.io
from ..mappings.eggnog import EGGNOG_CAT
from ..utils import sequence as seq_utils
from ..consts import MIN_COV
from ..utils.common import between, union_range, ranges_length
from ..utils.trans_tables import UNIVERSAL
LOG = logging.getLogger(__name__)
[docs]class AttributeNotFound(Exception):
"""
Raised if an attribute is not found in a GFF file
"""
pass
[docs]def write_gff(annotations, file_handle, verbose=True):
"""
.. versionchanged:: 0.1.12
added *verbose* argument
Write a GFF to file
Arguments:
annotations (iterable): iterable that returns :class:`GFFKegg`
or :class:`Annotation` instances
file_handle (str, file): file name or file handle to write to
verbose (bool): if True, a message is logged
"""
if isinstance(file_handle, str):
file_handle = mgkit.io.open_file(file_handle, 'wb')
if verbose:
LOG.info(
"Writing annotations to file (%s)",
getattr(file_handle, 'name', repr(file_handle))
)
for annotation in annotations:
annotation.to_file(file_handle)
[docs]class GenomicRange(object):
"""
Defines a genomic range
.. versionchanged:: 0.2.1
using __slots__ for better memory usage
"""
__slots__ = ('seq_id', 'strand', 'start', 'end')
# seq_id = 'None'
# "Sequence ID"
# strand = '+'
# "Strand"
# start = None
# "Start of the range, 1-based"
# end = None
# "End of the range 1-based"
def __init__(self, seq_id='None', start=1, end=1, strand='+'):
self.seq_id = seq_id
self.strand = strand
self.start = start
self.end = end
def __eq__(self, other):
if (self.seq_id != other.seq_id) or (self.strand != other.strand):
return False
if (self.start != other.start) or (self.end != other.end):
return False
return True
def __len__(self):
return self.end - self.start + 1
def __str__(self):
return "{0}({1}):{2}-{3}".format(
self.seq_id,
self.strand,
self.start,
self.end
)
def __repr__(self):
return str(self)
[docs] def union(self, other):
"""
Return the union of two :class:`GenomicRange`
"""
if (self.seq_id == other.seq_id) and (self.strand == other.strand):
result = union_range(self.start, self.end, other.start, other.end)
if result is not None:
gen_range = GenomicRange()
gen_range.seq_id = self.seq_id
gen_range.strand = self.strand
gen_range.start = result[0]
gen_range.end = result[1]
return gen_range
return None
def __or__(self, other):
return self.union(other)
[docs] def expand_from_list(self, others):
"""
Expand the :class:`GenomicRange` range instance with a list of
:class:`GenomicRange`
Arguments:
others (iterable): iterable of :class:`GenomicRange`
"""
new_range = self
for other in others:
union = new_range.union(other)
if union is None:
continue
new_range = union
self.start = new_range.start
self.end = new_range.end
[docs] def intersect(self, other):
"""
Return an instance of :class:`GenomicRange` that represent the
intersection of the current instance and another.
"""
if (self.seq_id == other.seq_id) and (self.strand == other.strand):
if between(other.start, self.start, self.end) or \
between(other.end, self.start, self.end) or \
between(self.start, other.start, other.end) or \
between(self.end, other.start, other.end):
gen_range = GenomicRange()
gen_range.start = max(self.start, other.start)
gen_range.end = min(self.end, other.end)
return gen_range
return None
def __and__(self, other):
return self.intersect(other)
[docs] def __contains__(self, pos):
"""
.. versionchanged:: 0.2.3
a range or a subclass are accepted
.. versionadded:: 0.1.16
Tests if the position is inside the range of the GenomicRange
Pos is 1-based as :attr:`GenomicRange.start` and
:attr:`GenomicRange.end`
"""
if isinstance(pos, int):
return between(pos, self.start, self.end)
elif isinstance(pos, GenomicRange):
pos = (pos.start, pos.end)
return (between(pos[0], self.start, self.end)) and \
(between(pos[1], self.start, self.end))
[docs] def get_range(self):
"""
.. versionadded:: 0.1.13
Returns the start and end position as a tuple
"""
return (self.start, self.end)
[docs] def get_relative_pos(self, pos):
"""
.. versionadded:: 0.1.16
Given an absolute position (referred to the reference), convert the
position to a coordinate relative to the GenomicRange
Returns:
int: the position relative to the GenomicRange
Raises:
ValueError: if the position is not in the range
"""
if pos not in self:
raise ValueError("Position {} not in GenomicRange".format(pos))
return pos - self.start + 1
[docs]class Annotation(GenomicRange):
"""
.. versionadded:: 0.1.12
.. versionchanged:: 0.2.1
using __slots__ for better memory usage
Alternative implementation for an Annotation. When initialised, If *uid* is
None, a unique id is added using `uuid.uuid4`.
"""
__slots__ = ('source', 'feat_type', 'score', 'phase', 'attr')
# source = 'None'
# "Annotation source"
# feat_type = 'None'
# "Annotation type (e.g. CDS, gene, exon, etc.)"
# score = 0.0
# "Score associated to the annotation"
# phase = 0
# "Annotation phase, (0, 1, 2)"
# attr = None
# "Dictionary with the key value pairs in the last column of a GFF/GTF"
def __init__(self, seq_id='None', start=1, end=1, strand='+',
source='None', feat_type='None', score=0.0, phase=0, uid=None,
**kwd):
super(Annotation, self).__init__(
seq_id=seq_id,
start=start,
end=end,
strand=strand
)
self.source = source
self.feat_type = feat_type
self.score = score
self.phase = phase
self.attr = kwd
if uid is None:
self.uid = str(uuid.uuid4())
else:
self.uid = uid
def __hash__(self):
return hash(self.uid)
[docs] def get_ec(self, level=4):
"""
.. versionadded:: 0.1.13
.. versionchanged:: 0.2.0
returns a *set* instead of a list
Returns the EC values associated with the annotation, cutting them at
the desired level.
Arguments:
level (int): level of classification desired (between 1 and 4)
Returns:
set: list of all EC numbers associated, at the desired level, if
none are found an empty set is returned
"""
ec = self.attr.get('EC', None)
if ec is None:
return set()
ec = ec.split(',')
return set(['.'.join(x.split('.')[:level]) for x in ec])
[docs] def get_fc(self, description=False):
"""
.. versionadded:: 0.5.7
Arguments:
description (bool): If True, instead of the one letter category,
the description is returned
Returns:
list: list of functional categories
"""
try:
fc = self.attr['FC']
except KeyError:
return set()
return set(
EGGNOG_CAT[func_cat] if description else func_cat
for func_cat in fc
)
[docs] def get_mapping(self, db):
"""
.. versionadded:: 0.1.13
Returns the mappings, to a particular db, associated with the
annotation.
Arguments:
db (str): database to which the mappings come from
Returns:
list: list of all mappings associated, to the specified db, if
none are found an empty list is returned
"""
mappings = self.attr.get('map_{0}'.format(db.upper()))
if mappings is None:
return []
return mappings.split(',')
[docs] def set_mapping(self, db, values):
"""
.. versionadded:: 0.1.13
Set mappings to a particular db, associated with the
annotation.
Arguments:
db (str): database to which the mappings come from
mappings (iterable): iterable of mappings
"""
self.set_attr(
'map_{0}'.format(db.upper()),
','.join(values)
)
[docs] def get_mappings(self):
"""
.. versionadded:: 0.2.1
Return a dictionary where the keys are the mapping DBs (lowercase) and
and the values are the mapping IDs for that DB
"""
mappings = {}
for key in self.attr:
if key.startswith('map_'):
db = key.replace('map_', '').lower()
mappings[db] = self.get_mapping(db)
return mappings
@property
def taxon_id(self):
"""
.. versionchanged:: 0.3.1
if taxon_id is set to "None" as a string, it's converted to *None*
taxon_id of the annotation
"""
value = self.attr.get('taxon_id', None)
try:
value = int(value)
except (TypeError, ValueError):
value = None
return value
@taxon_id.setter
def taxon_id(self, value):
"""
.. versionchanged:: 0.5.7
No Exception is raised if the value cannot be converted, but
the taxon_id won't be set
"""
try:
self.attr['taxon_id'] = int(value)
except TypeError:
pass
@property
def db(self):
"db used for the gene_id prediction"
return self.attr.get('db', None)
@db.setter
def db(self, value):
self.attr['db'] = value
@property
def taxon_db(self):
"db used for the taxon_id prediction"
return self.attr.get('taxon_db', None)
@taxon_db.setter
def taxon_db(self, value):
self.attr['taxon_db'] = value
@property
def dbq(self):
"db quality of the annotation"
try:
return self.get_attr('dbq', int)
except AttributeNotFound:
return None
@dbq.setter
def dbq(self, value):
self.attr['dbq'] = value
@property
def uid(self):
"""
.. versionadded:: 0.1.13
uid of the annotation
"""
value = self.attr.get('uid', None)
if value is None:
# old data where the unique id is marked as ko_idx
value = self.attr.get('ko_idx', None)
return value
@uid.setter
def uid(self, value):
self.attr['uid'] = value
@property
def bitscore(self):
"bitscore of the annotation"
try:
return float(self.attr['bitscore'])
except KeyError:
# legacy for old data
bitscore = self.attr.get('bit_score', None)
return None if bitscore is None else float(bitscore)
@bitscore.setter
def bitscore(self, value):
self.attr['bitscore'] = float(value)
@property
def gene_id(self):
"gene_id of the annotation, or *ko* if available"
try:
return self.attr['gene_id']
except KeyError:
# legacy for old data
return self.attr.get('ko', None)
@gene_id.setter
def gene_id(self, value):
self.attr['gene_id'] = value
@property
def length(self):
"""
.. versionchanged:: 0.2.0
Length of the annotation, uses `len(self)`
"""
return len(self)
@property
def region(self):
"""
.. versionadded:: 0.1.13
Return the *region* covered by the annotation, to use in samtools
"""
return "{0}:{1}:{2}".format(self.seq_id, self.start, self.end)
@property
def counts(self):
"""
.. versionadded:: 0.2.2
Returns the sample counts for the annotation
"""
counts = {}
for key, value in viewitems(self.attr):
if key.startswith('counts_'):
key = key.replace('counts_', '')
counts[key] = float(value)
return counts
@counts.setter
def counts(self, counts):
"""
.. versionadded:: 0.2.2
Sets the sample counts for the annotation
Arguments:
counts (dict): key is the sample name and the count for it
"""
for key, value in viewitems(counts):
self.attr["counts_{}".format(key)] = value
@property
def fpkms(self):
"""
.. versionadded:: 0.2.2
Returns the sample fpkms for the annotation
"""
fpkms = {}
for key, value in viewitems(self.attr):
if key.startswith('fpkms_'):
key = key.replace('fpkms_', '')
fpkms[key] = float(value)
return fpkms
@fpkms.setter
def fpkms(self, fpkms):
"""
.. versionadded:: 0.2.2
Sets the sample fpkms for the annotation
Arguments:
fpkms (dict): key is the sample name and the fpmk for it
"""
for key, value in viewitems(fpkms):
self.attr["fpkms_{}".format(key)] = value
[docs] def add_exp_syn_count(self, seq, syn_matrix=None):
"""
.. versionadded:: 0.1.13
Adds expected synonymous/non-synonymous values for an annotation.
Arguments:
seq (str): sequence corresponding to the annotation seq_id
syn_matrix (None, dict): matrix that determines the return
values. Defaults to the one defined in the called function
:func:`mgkit.utils.sequnce.get_seq_expected_syn_count`.
"""
seq = self.get_nuc_seq(seq, reverse=self.strand == '-')
syn_count, nonsyn_count = seq_utils.get_seq_expected_syn_count(
seq,
syn_matrix=syn_matrix
)
self.set_attr('exp_syn', syn_count)
self.set_attr('exp_nonsyn', nonsyn_count)
[docs] def to_gff(self, sep='='):
"""
Format the Annotation as a GFF string.
Arguments:
sep (str): separator key -> value
Returns:
str: annotation formatted as GFF
"""
var_names = (
'seq_id', 'source', 'feat_type', 'start', 'end',
'score', 'strand', 'phase'
)
values = '\t'.join(
str(getattr(self, var_name))
for var_name in var_names
)
attr_column = ';'.join(
'{0}{1}"{2}"'.format(
key,
sep,
quote(str(self.attr[key]), ' ()/')
)
for key in sorted(self.attr)
)
return "{0}\t{1}\n".format(values, attr_column)
[docs] def to_dict(self, exclude_attr=None):
"""
.. versionadded:: 0.3.1
Return a dictionary representation of the Annotation.
Arguments:
exclude_attr (str,list): attributes to exclude from the dictionary,
can be either a single attribute (string) or a list of strings
Returns:
dict: dictionary with the annotation
"""
var_names = (
'seq_id', 'source', 'feat_type', 'start', 'end',
'score', 'strand', 'phase'
)
dictionary = {}
for var_name in var_names:
dictionary[var_name] = getattr(self, var_name)
dictionary.update(self.attr)
if exclude_attr is not None:
if isinstance(exclude_attr, str):
exclude_attr = [exclude_attr]
for attr in exclude_attr:
del dictionary[attr]
return dictionary
[docs] def to_json(self):
"""
.. versionadded:: 0.2.1
.. versionchanged:: 0.3.1
now :meth:`Annotation.to_dict` is used
Returns a json representation of the Annotation
"""
return json.dumps(self.to_dict(), separators=(',', ':'))
[docs] def to_mongodb(self, lineage_func=None, indent=None, raw=False):
"""
.. versionadded:: 0.2.1
.. versionchanged:: 0.2.2
added handling of *counts_* and *fpkms_*
.. versionchanged:: 0.2.6
added *indent* parameter
.. versionchanged:: 0.3.4
added *raw*
Returns a MongoDB document that represent the Annotation.
Arguments:
lineage (func): function used to populate the lineage key, returns
a list of taxon_id
indent (int): the amount of indent to put in the record, None (the
default) is for the most compact - one line for the record
raw (bool): if True, the method returns a string, which is the
json dump, if False, the value returned is the dictionary
Returns:
str or dict: the MongoDB document, with Annotation.uid as _id, as
a string if *raw* is True, a dictionary if it is False
"""
# OrderedDict is necessary to keep the order of the keys
dictionary = OrderedDict()
# _id must be the first element
dictionary['_id'] = self.uid
var_names = (
'seq_id', 'source', 'feat_type', 'start', 'end', 'score', 'strand',
'phase', 'gene_id', 'taxon_id', 'bitscore', 'exp_nonsyn',
'exp_syn', 'length', 'dbq', 'coverage', 'uid'
)
for var_name in var_names:
try:
dictionary[var_name] = getattr(self, var_name)
except AttributeNotFound:
pass
ec_ids = self.get_ec()
mappings = self.get_mappings()
# if one at least has values
if ec_ids:
mappings['ec'] = list(ec_ids)
if lineage_func is not None:
dictionary['lineage'] = lineage_func(self.taxon_id)
dictionary['map'] = mappings
counts = self.counts
if counts:
dictionary['counts'] = counts
fpkms = self.fpkms
if fpkms:
dictionary['fpkms'] = fpkms
# the rest of the dictionary should be put, excluding special keys:
# uid is used as _id in the document
# EC is put as a array, as is any mapping like map_KO
dictionary.update(
dict(
(key, value)
for key, value in viewitems(self.attr)
if (key not in var_names) and (key not in ('uid', 'EC')) and
(not key.startswith('map_')) and
(not key.startswith('counts_')) and
(not key.startswith('fpkms_'))
)
)
if raw:
return dictionary
return json.dumps(dictionary, indent=indent, separators=(',', ':'))
[docs] def to_file(self, file_handle):
"""
Writes the GFF annotation to *file_handle*
"""
file_handle.write(self.to_gff().encode('ascii'))
[docs] def to_gtf(self, gene_id_attr='uid', sep=' '):
"""
.. versionadded:: 0.1.15
.. versionchanged:: 0.1.16
added *gene_id_attr* parameter
.. versionchanged:: 0.2.2
added *sep* argument, default to a space, now
Simple conversion to a valid GTF. gene_id and transcript_id are set to
*uid* or the attribute specified using the *gene_id_attr* parameter.
It's written to be used with *SNPDat*.
"""
var_names = (
'seq_id', 'source', 'feat_type', 'start', 'end',
'score', 'strand', 'phase'
)
values = '\t'.join(
str(getattr(self, var_name))
for var_name in var_names
)
# Keys that needs to be at the start of the attributes
gtf_attr = ['gene_id', 'transcript_id']
attr_keys = sorted(self.attr.keys())
# eliminate gene_id (always present in new ones)
try:
attr_keys.remove('gene_id')
except ValueError:
pass
# transcript_id don't always be there
try:
attr_keys.remove('transcript_id')
except ValueError:
pass
attr_values = [self.get_attr(gene_id_attr)] * 2 + [
self.attr[attr_key]
for attr_key in attr_keys
]
attr_keys = gtf_attr + attr_keys
attr_column = '; '.join(
'{0}{1}"{2}"'.format(
key,
sep,
quote(value, ' ()/')
)
for key, value in zip(attr_keys, attr_values)
)
return "{0}\t{1}\n".format(values, attr_column)
@property
def sample_coverage(self):
"""
.. versionadded:: 0.1.13
Returns a dictionary with the coverage for each sample, the returned
dictionary has the sample id (stripped of the *_cov*) suffix and as
values the coverage (converted via :func:`int`).
:return dict: dictionary with the samples' coverage
"""
attributes = self.attr
return dict(
(attribute.replace('_cov', ''), float(value))
for attribute, value in viewitems(attributes)
if attribute.endswith('_cov')
)
[docs] def get_number_of_samples(self, min_cov=MIN_COV):
"""
.. versionadded:: 0.1.13
Returns the number of sample that have at least a minimum coverage of
`min_cov`.
:param int min_cov: minimum coverage
:return int: number of samples passing the filter
:raise AttributeNotFound: if no sample coverage attribute is found
"""
coverage = self.sample_coverage
if not coverage:
raise AttributeNotFound(
'No coverage attribute found (ending in "_cov")'
)
return sum(
1 for sample, coverage in viewitems(coverage)
if coverage >= min_cov
)
[docs] def get_attr(self, attr, conv=str):
"""
.. versionchanged:: 0.3.4
any GFF attribute can be returned
.. versionchanged:: 0.3.3
added *seq_id* as special attribute, in addition do *length*
.. versionadded:: 0.1.13
Generic method to get an attribute and convert it to a specific
datatype. The order for the lookup is:
* length
* self.attr (dictionary)
* getattr(self) of the first 8 columns of a GFF (seq_id, source, ...)
"""
if attr == 'length':
return len(self)
value = self.attr.get(attr, None)
if value is not None:
return conv(value)
if attr in ('seq_id', 'source', 'feat_type', 'start', 'end', 'score',
'strand', 'phase'):
value = getattr(self, attr, None)
if value is not None:
return conv(value)
raise AttributeNotFound('No {0} attribute found'.format(attr))
[docs] def set_attr(self, attr, value):
"""
.. versionadded:: 0.1.13
.. versionchanged:: 0.4.4
a standard attribute can now be set
Generic method to set an attribute
"""
if attr in ('seq_id', 'source', 'feat_type', 'start', 'end', 'score',
'strand', 'phase'):
setattr(self, attr, value)
return None
self.attr[attr] = value
[docs] def has_attr(self, attr):
"""
.. versionadded:: 0.4.4
Tests if an attributes is present in the Annotation
Arguments:
attr (str): attribute to test
Returns:
bool: True if the attributes is present
"""
if attr in ('seq_id', 'source', 'feat_type', 'start', 'end', 'score',
'strand', 'phase'):
return True
return attr in self.attr
[docs] def del_attr(self, attr):
"""
.. versionadded:: 0.4.4
Removes attributes from the Annotation
"""
if attr in ('seq_id', 'source', 'feat_type', 'start', 'end', 'score',
'strand', 'phase'):
return None
try:
del self.attr[attr]
except KeyError:
pass
@property
def coverage(self):
"""
.. versionadded:: 0.1.13
Return the total coverage for the annotation
:return float: coverage
:raise AttributeNotFound: if no coverage attribute is found
"""
return self.get_attr('cov', float)
@property
def exp_syn(self):
"""
.. versionadded:: 0.1.13
Returns the expected number of synonymous changes
"""
return self.get_attr('exp_syn', int)
@property
def exp_nonsyn(self):
"""
.. versionadded:: 0.1.13
Returns the expected number of non-synonymous changes
"""
return self.get_attr('exp_nonsyn', int)
[docs] def get_nuc_seq(self, seq, reverse=False, snp=None):
"""
.. versionadded:: 0.1.13
.. versionchanged:: 0.1.16
added *snp* parameter
Returns the nucleotidic sequence that the annotation covers. if the
annotation's strand is *-*, and *reverse* is True, the reverse
complement is returned.
Arguments:
seq (seq): chromosome/contig sequence
reverse (bool): if True and the strand is '-', a reverse complement
is returned
snp (tuple): first element is the position of the SNP relative to
the Annotation and the second element is the change
Returns:
str: nucleotide sequence with requested transformations
"""
ann_seq = seq[self.start - 1:self.end]
if snp is not None:
ann_seq = seq_utils.get_variant_sequence(ann_seq, snp)
if (self.strand == '-') and reverse:
ann_seq = seq_utils.reverse_complement(ann_seq)
return ann_seq
[docs] def get_aa_seq(self, seq, start=0, tbl=None, snp=None):
"""
.. versionadded:: 0.1.16
Returns a translated aminoacid sequence of the annotation. The snp
parameter is passed to :meth:`Annotation.get_nuc_seq`
Arguments:
seq (seq): chromosome/contig sequence
start (int): position (0-based) from where the correct occurs
(frame). If None, the phase attribute is used
tbl (dict): dictionary with the translation for each codon,
passed to :func:`mgkit.utils.sequence.translate_sequence`
snp (tuple): first element is the position of the SNP and the
second element is the change
Returns:
str: aminoacid sequence
"""
if start is None:
start = self.phase
nuc_seq = self.get_nuc_seq(seq, reverse=True, snp=snp)
return seq_utils.translate_sequence(
nuc_seq,
start=start,
tbl=None,
reverse=False
)
[docs] def add_gc_content(self, seq):
"""
Adds GC content information for an annotation. The formula is:
.. math::
:label: gc_content_gff
\\frac {(G + C)}{(G + C + A + T)}
Modifies the instances of the annotation. gc_ratio will be added to its
attributes.
Arguments:
seq (str): nucleotide sequence referred in the GFF
"""
ann_seq = self.get_nuc_seq(
seq,
reverse=True if self.strand == '-' else False
)
at_sum = (ann_seq.count('A') + ann_seq.count('T'))
gc_sum = (ann_seq.count('G') + ann_seq.count('C'))
gc_cont = gc_sum / (gc_sum + at_sum)
self.set_attr('gc_cont', gc_cont)
[docs] def add_gc_ratio(self, seq):
"""
Adds GC content information for an annotation. The formula is:
.. math::
:label: gc_ratio_gff
\\frac {(A + T)}{(G + C)}
Modifies the instances of the annotation. gc_ratio will be added to its
attributes.
Arguments:
seq (str): nucleotide sequence referred in the GFF
"""
ann_seq = self.get_nuc_seq(
seq,
reverse=True if self.strand == '-' else False
)
at_sum = (ann_seq.count('A') + ann_seq.count('T'))
gc_sum = (ann_seq.count('G') + ann_seq.count('C'))
gc_ratio = at_sum / gc_sum
self.set_attr('gc_ratio', gc_ratio)
[docs] def is_syn(self, seq, pos, change, tbl=None, abs_pos=True, start=0,
strict=True):
"""
.. versionadded:: 0.1.16
.. versionchanged:: 0.4.4
added *strict* parameter
Return if a SNP is synonymous or non-synonymous.
Arguments:
seq (seq): reference sequence of the annotation
pos (int): position of the SNP on the reference (1-based index)
change (str): nucleotidic change
tbl (dict): dictionary with the translation table. Defaults to the
universal genetic code
abs_pos (bool): if True the *pos* is referred to the reference and
not a position relative to the annotation
start (int or None): phase to be used to get the start position of
the codon. if None, the Annotation phase will be used
strict (bool): if a variant codon is not found, a KeyError is
raised, otherwise *False* is returned
Returns:
bool: True if the SNP is synonymous, false if it's non-synonymous.
Behaviour in case of variant codons not found in the translation
table changes based on *strict*
Raises:
KeyError: if the variant codon is not found and *strict* is True
"""
if abs_pos:
rel_pos = self.get_relative_pos(pos)
else:
rel_pos = pos
if start is None:
start = self.phase
if tbl is None:
tbl = UNIVERSAL
# codon number in the sequence
codon_index = (rel_pos - start - 1) // 3
# the position to slice the seq to get a codon (0-based). It takes into
# account the phase (start) and the codon index
seq_start = (self.start + start + (codon_index * 3) - 1)
# position in the codon using the relative position and the phase/frame
# the module will give 1, 2 or 0. -1 will shift the position correctly
codon_change = ((rel_pos - start) % 3) - 1
codon = seq[seq_start:seq_start+3]
var_codon = list(codon)
var_codon[codon_change] = change
var_codon = ''.join(var_codon)
if self.strand == '-':
codon = seq_utils.reverse_complement(codon)
var_codon = seq_utils.reverse_complement(var_codon)
try:
return UNIVERSAL[codon] == UNIVERSAL[var_codon]
except KeyError:
LOG.warning("""Annotation %s has an unrecognised codon:
reference %s, variant %s""", self.uid, codon,
var_codon)
if strict:
raise KeyError("Variant codon not found %s", var_codon)
else:
return False
[docs]def from_glimmer3(header, line, feat_type='CDS'):
"""
.. versionadded:: 0.1.12
Parses the line of a GLIMMER3 ouput and returns an instance of a GFF
annotation.
Arguments:
header (str): the seq_id to which the ORF belongs
line (str): the prediction line for the orf
feat_type (str): the feature type to use
Returns:
Annotation: instance of annotation
Example:
Assuming a GLIMMER3 output like this::
>sequence0001
orf00001 66 611 +3 6.08
The code used is:
>>> header = 'sequence0001'
>>> line = 'orf00001 66 611 +3 6.08'
>>> from_glimmer3(header, line)
"""
if isinstance(line, bytes):
line = line.decode('ascii')
orf_id, start, end, frame, score = line.split()
start = int(start)
end = int(end)
if start > end:
start, end = end, start
annotation = Annotation(
seq_id=header,
source='GLIMMER3',
feat_type=feat_type,
start=start,
end=end,
score=float(score),
strand=frame[0],
phase=int(frame[1]) - 1,
frame=frame,
glimmer_score=float(score),
orf_id=orf_id
)
return annotation
[docs]class DuplicateKeyError(Exception):
"""
.. versionadded:: 0.1.12
Raised if a GFF annotation contains duplicate keys
"""
pass
[docs]def from_gff(line, strict=True, encoding='ascii'):
"""
.. versionadded:: 0.1.12
.. versionchanged:: 0.2.6
added *strict* parameter
.. versionchanged:: 0.4.0
added *encoding* parameter
Parse GFF line and returns an :class:`Annotation` instance
Arguments:
line (str): GFF line
strict (bool): if True duplicate keys raise an exception
Returns:
Annotation: instance of :class:`Annotation` for the line
Raises:
DuplicateKeyError: if the attribute column has duplicate keys
"""
if isinstance(line, bytes):
line = line.decode(encoding)
line = line.rstrip().split('\t')
# in case the last column (attributes) is empty
if len(line) < 9:
values = line
# bug in which the phase was not written
if len(line[-1]) > 1:
line.insert(-1, 0)
else:
values = line[:-1]
var_names = (
'seq_id', 'source', 'feat_type', 'start', 'end',
'score', 'strand', 'phase'
)
# the phase sometimes can be set as unknown, using '-'. We prefer using 0
var_types = (str, str, str, int, int, float, str,
lambda x: 0 if x == '' else int(x))
attr = {}
for var, value, vtype in zip(var_names, values, var_types):
try:
attr[var] = vtype(value)
except ValueError:
attr[var] = value
# in case the last column (attributes) is empty
if len(line) < 9:
return Annotation(**attr)
for pair in line[-1].split(';'):
try:
# by default the key,value separator '=' is assumed to be used
var, value = pair.strip().split('=', 1)
except ValueError:
# in case it doesn't work, it is assumed to be a space
if ' ' in pair.strip():
var, value = pair.strip().split(' ', 1)
else:
# case in which there's an attribute but no value, like a bool
var = pair.strip()
value = None
if (var in attr):
if strict:
raise DuplicateKeyError("Duplicate attribute: {0}".format(var))
else:
old_var = var
var = '_dup_' + var
LOG.debug("Duplicate attribute: %s -> %s", old_var, var)
# skips possible key/values generated by the line ending with a ';'
if not var:
continue
if value is not None:
value = unquote(value.replace('"', ''))
attr[var] = value
return Annotation(**attr)
[docs]def from_sequence(name, seq, feat_type='SEQUENCE', **kwd):
"""
.. versionadded:: 0.1.12
Returns an instance of :class:`Annotation` for the full length of a
sequence
Arguments:
name (str): name of the sequence
seq (str): sequence, to get the length of the annotation
Keyword Args:
feat_type (str): feature type in the GFF
**kwd: any additional column
Returns:
Annotation: instance of :class:`Annotation`
"""
annotation = Annotation(
seq_id=name,
source='SEQUENCE',
feat_type=feat_type,
start=1,
end=len(seq),
score=0.0,
strand='+',
phase=0,
sequence=name,
**kwd
)
return annotation
[docs]def from_aa_blast_frag(hit, parent_ann, aa_seqs):
frag_id, frame = hit[0].split('-')
strand = '+' if frame.startswith('f') else '-'
frame = int(frame[1])
identity = hit[2]
bitscore = hit[-1]
start = hit[3]
end = hit[4]
if strand == '-':
start, end = seq_utils.reverse_aa_coord(
start,
end,
len(aa_seqs[hit[0]])
)
start, end = seq_utils.convert_aa_to_nuc_coord(start, end, frame)
annotation = Annotation(
seq_id=parent_ann.seq_id,
source='BLAST',
feat_type='CDS',
start=start + parent_ann.start - 1,
end=end + parent_ann.start - 1,
score=bitscore,
strand=strand,
phase=frame,
db='UNIPROT',
gene_id=hit[1],
identity=identity,
bitscore=bitscore,
ID=frag_id
)
return annotation
[docs]def from_nuc_blast_frag(hit, parent_ann, db='NCBI-NT'):
frag_id = hit[0]
strand = '+'
identity = hit[2]
bitscore = hit[-1]
start = hit[3]
end = hit[4]
annotation = Annotation(
seq_id=parent_ann.seq_id,
source='BLAST',
feat_type='CDS',
start=start + parent_ann.start - 1,
end=end + parent_ann.start - 1,
score=bitscore,
strand=strand,
phase=0,
db=db,
gene_id=hit[1],
identity=identity,
bitscore=bitscore,
ID=frag_id
)
return annotation
[docs]def annotate_sequence(name, seq, window=None):
length = len(seq)
if window is None:
window = length
for index in range(1, length, window):
annotation = Annotation.from_sequence(name, seq)
annotation.start = index
annotation.end = index + window - 1
if annotation.end > length:
annotation.end = length
yield annotation
[docs]def from_nuc_blast(hit, db, feat_type='CDS', seq_len=None, to_nuc=False,
**kwd):
"""
.. versionadded:: 0.1.12
.. versionchanged:: 0.1.16
added *to_nuc* parameter
.. versionchanged:: 0.2.3
removed *to_nuc*, the hit can include the subject end/start and evalue
Returns an instance of :class:`Annotation`
Arguments:
hit (tuple): a BLAST hit, from :func:`mgkit.io.blast.parse_blast_tab`
db (str): db used with BLAST
Keyword Args:
feat_type (str): feature type in the GFF
seq_len (int): sequence length, if supplied, the phase for strand '-'
can be assigned, otherwise is assigned a 0 and the sequence
coverage are added
**kwd: any additional column
Returns:
Annotation: instance of :class:`Annotation`
"""
seq_id = hit[0]
gene_id = hit[1]
strand = '+'
identity = hit[2]
bitscore = hit[-1]
start = hit[3]
end = hit[4]
phase = 0
if start > end:
start, end = end, start
strand = '-'
if seq_len is not None:
phase = (seq_len - end) % 3
if seq_len is not None:
query_coverage = (end - start + 1.) / seq_len * 100
kwd['query_coverage'] = query_coverage
if strand == '+':
phase = (start - 1) % 3
annotation = Annotation(
seq_id=seq_id,
source='BLAST',
feat_type=feat_type,
start=start,
end=end,
score=bitscore,
strand=strand,
phase=phase,
db=db,
gene_id=gene_id,
identity=identity,
bitscore=bitscore,
frame="{}{}".format('f' if strand == '+' else 'r', phase),
**kwd
)
# the hit includes subject end/start and evalue, as per new version of
# mgkit.io.blast.parse_uniprot_blast
if len(hit) == 9:
annotation.attr['evalue'] = hit[-2]
annotation.attr['subject_end'] = hit[-3]
annotation.attr['subject_start'] = hit[-4]
return annotation
[docs]def from_json(line):
"""
.. versionadded:: 0.2.1
Returns an Annotation from a json representation
"""
return Annotation(**json.loads(line))
[docs]def from_hmmer(line, aa_seqs, feat_type='gene', source='HMMER',
db='CUSTOM', custom_profiles=True, noframe=False):
"""
.. versionadded:: 0.1.15
first implementation to move old scripts to new GFF specs
.. versionchanged:: 0.2.1
removed compatibility with old scripts
.. versionchanged:: 0.2.2
taxon_id and taxon_name are not saved for non-custom profiles
.. versionchanged:: 0.3.1
added support for non mgkit-translated sequences (*noframe*)
Parse HMMER results (one line), it won't parse commented lines (starting
with *#*)
Arguments:
line (str): HMMER domain table line
aa_seqs (dict): dictionary with amino-acid sequences (name->seq),
used to get the correct nucleotide positions
feat_type (str): string to be used in the 'feature type' column
source (str): string to be used in the 'source' column
custom_profiles (bool): if True, the profile name contains gene,
taxonomy and reviewed information in the form
KOID_TAXONID_TAXON-NAME(-nr)
noframe (bool): if True, the sequence is assumed to be in frame f0
Returns:
A :class:`Annotation` instance
.. note::
if `custom_profiles` is False, gene_id, taxon_id and taxon_name will
be equal to the profile name
"""
if isinstance(line, bytes):
line = line.decode('ascii')
line = line.split()
if noframe:
# no information on the frame is provided (already a protein, so f0)
frame = 'f0'
contig = line[0]
else:
contig, frame = line[0].rsplit('-', 1)
t_from = int(line[17])
t_to = int(line[18])
# first get coordinate if sequence is reversed
if frame.startswith('r'):
seq_len = len(aa_seqs[line[0]])
t_from, t_to = seq_utils.reverse_aa_coord(t_from, t_to, seq_len)
# necessary only if frame information available
if not noframe:
# converts in nucleotide coordinates
t_from, t_to = seq_utils.convert_aa_to_nuc_coord(
t_from,
t_to,
frame=int(frame[-1])
)
# maintains the aa coordinates
aa_from = int(line[17])
aa_to = int(line[18])
profile_name = line[3]
score = float(line[6])
if custom_profiles:
# KOID_TAXONID_TAXON-NAME(-nr)
reviewed = 'False' if profile_name.endswith('-nr') else 'True'
gene_id, taxon_id, taxon_name = profile_name.split('_')
else:
gene_id = profile_name
taxon_id = taxon_name = None
annotation = Annotation(
seq_id=contig,
source=source,
feat_type=feat_type,
start=t_from,
end=t_to,
score=score,
strand='-' if frame.startswith('r') else '+',
phase=int(frame[1]),
db=db,
gene_id=gene_id,
taxon_id=taxon_id,
bitscore=float(line[7]),
# custom for HMMER profiles
aa_from=aa_from,
aa_to=aa_to,
# stores the aa sequence
aa_seq=aa_seqs[line[0]][aa_from - 1:aa_to],
# evalue
evalue=score,
# maintains HMMER profile information:
# profile name
name=profile_name,
# both strand/phase (e.g r2)
frame=frame,
# old version of uid
# ko_idx=ko_idx,
# used in other old profiles, where the taxon name was used instead
# of a taxon ID
taxon_name=taxon_name
)
try:
annotation.attr['reviewed'] = reviewed
except UnboundLocalError:
pass
# removes the None values from non-custom profiles
if taxon_id is None:
del annotation.attr['taxon_id']
if taxon_name is None:
del annotation.attr['taxon_name']
return annotation
[docs]def parse_gff(file_handle, gff_type=from_gff, strict=False, encoding='ascii', filter_func=None):
"""
.. versionchanged:: 0.5.7
added *filter_func*
.. versionchanged:: 0.4.0
In some cases ASCII decoding is not enough, so it is parametrised now
.. versionchanged:: 0.3.4
added decoding from binary for compatibility with Python3
.. versionchanged:: 0.2.6
added *strict* parameter
.. versionchanged:: 0.2.3
correctly handling of GFF with comments of appended sequences
.. versionchanged:: 0.1.12
added *gff_type* parameter
Parse a GFF file and returns generator of :class:`GFFKegg` instances
Accepts a file handle or a string with the file name
Arguments:
file_handle (str, file): file name or file handle to read from
gff_type (class): class/function used to parse a GFF annotation
strict (bool): if True duplicate keys raise an exception
encoding (str): encoding of the file, if ascii fails, use utf8
file_func (func): a function used to filter, accepts an annotation as
argument and returns True if the annotation is to be kept
Yields:
Annotation: an iterator of :class:`Annotation` instances
"""
file_handle = mgkit.io.open_file(file_handle, 'rb')
LOG.info(
"Loading GFF from file (%s)",
getattr(file_handle, 'name', repr(file_handle))
)
index = 0
for index, line in enumerate(file_handle):
try:
line = line.decode(encoding)
except UnicodeError:
raise UnicodeError("Impossible to decode line to {}: {}".format(encoding, line))
# the first is for GFF with comments and the second for
# GFF with the fasta file attached
if line.startswith('#'):
continue
if line.startswith('>'):
break
annotation = gff_type(line, strict=strict)
if filter_func is not None:
if not filter_func(annotation):
continue
yield annotation
LOG.info(
"Read %d line from file (%s)",
index + 1,
getattr(file_handle, 'name', repr(file_handle))
)
[docs]def diff_gff(files, key_func=None):
"""
.. versionadded:: 0.1.12
Returns a simple diff made between a list of gff files. The annotations are
grouped using *key_func*, so it depends on it to find similar annotations.
Arguments:
files (iterable): an iterable of file handles, pointing to GFF files
key_func (func): function used to group annotations, defaults to this
key: *(x.seq_id, x.strand, x.start, x.end, x.gene_id, x.bitscore)*
Returns:
dict: the returned dictionary keys are determined by key_func and as
values lists. The lists elements are tuple whose first element is the
index of the file, relative to *files* and the second element is the
line number in which the annotation is. Can be used with the
:mod:`linecache` module.
"""
if isinstance(files, str) or len(files) == 1:
return
if key_func is None:
def key_func(x):
return (x.seq_id, x.strand, x.start, x.end, x.gene_id, x.bitscore)
gff_diff = {}
for index, file_handle in enumerate(files):
for lineno, annotation in enumerate(parse_gff(file_handle)):
key = key_func(annotation)
try:
gff_diff[key].append((index, lineno))
except KeyError:
gff_diff[key] = [(index, lineno)]
return gff_diff
[docs]def annotation_elongation(ann1, annotations):
"""
.. versionadded:: 0.1.12
Given an :class:`Annotation` instance and a list of the instances of the
same class, returns the longest overlapping range that can be found and the
annotations that are included in it.
.. warning::
annotations are not checked for seq_id and strand
Arguments:
ann1 (Annotation): annotation to elongate
annotations (iterable): iterable of :class:`Annotation` instances
Returns:
tuple: the first element is the longest range found, while the the
second element is a set with the annotations used
"""
used = set([ann1])
union = (ann1.start, ann1.end)
for ann2 in annotations:
new_union = union_range(union[0], union[1], ann2.start, ann2.end)
if new_union is not None:
used.add(ann2)
union = new_union
return union, used
[docs]def elongate_annotations(annotations):
"""
.. versionadded:: 0.1.12
Given an iterable of :class:`Annotation` instances, tries to find the all
possible longest ranges and returns them.
.. warning::
annotations are not checked for seq_id and strand
Arguments:
annotations (iterable): iterable of :class:`Annotation` instances
Returns:
set: set with the all ranges found
"""
annotations = sorted(annotations, key=lambda x: x.start)
ranges = set()
while len(annotations) > 0:
ann1 = annotations.pop(0)
union, used = annotation_elongation(ann1, annotations)
if union is None:
ranges.add((ann1.start, ann1.end))
else:
annotations = sorted(set(annotations) - used,
key=lambda x: x.start)
ranges.add(union)
return ranges
[docs]def annotation_coverage(annotations, seqs, strand=True):
"""
.. versionadded:: 0.1.12
Given a list of annotations and a dictionary where the keys are the
sequence names referred in the annotations and the values are the sequences
themselves, returns a number which indicated how much the sequence length
is "covered" in annotations. If *strand* is True the coverage is strand
specific.
Arguments:
annotations (iterable): iterable of :class:`Annotation` instances
seqs (dict): dictionary in which the keys are the sequence names and
the values are the sequences
strand (bool): if True, the values are strand specific (the
annotations) are grouped by (seq_id, strand) instead of seq_id
Yields:
tuple: the first element is the key, (seq_id, strand) if *strand* is
True or seq_id if *strand* is False, and the coverage is the second
value.
"""
if strand:
def key_func(x): return (x.seq_id, x.strand)
else:
def key_func(x): return x.seq_id
annotations = group_annotations(
annotations,
key_func=key_func
)
for key, key_ann in viewitems(annotations):
if isinstance(key, str):
seq_len = len(seqs[key])
else:
seq_len = len(seqs[key[0]])
covered = ranges_length(elongate_annotations(key_ann))
yield key, covered / seq_len * 100
[docs]def annotation_coverage_sorted(annotations, seqs, strand=True):
"""
.. versionadded:: 0.3.1
Given a list of annotations and a dictionary where the keys are the
sequence names referred in the annotations and the values are the sequences
themselves, returns a number which indicated how much the sequence length
is "covered" in annotations. If *strand* is True the coverage is strand
specific.
.. note::
It differs from :func:`annotation_coverage` because it assumes the
annotations are correctly sorted and in the values yielded
Arguments:
annotations (iterable): iterable of :class:`Annotation` instances
seqs (dict): dictionary in which the keys are the sequence names and
the values are the sequences
strand (bool): if True, the values are strand specific (the
annotations) are grouped by (seq_id, strand) instead of seq_id
Yields:
tuple: the first element is the seq_id, the second the strand (if
strand is True, else it's set to *None*), and the third element is the
coverage.
"""
if strand:
def key_func(x): return (x.seq_id, x.strand)
else:
def key_func(x): return x.seq_id
annotations = group_annotations_sorted(
annotations,
key_func=key_func
)
for ann in annotations:
seq_id = ann[0].seq_id
if strand:
ann_strand = ann[0].strand
else:
ann_strand = None
seq_len = len(seqs[seq_id])
covered = ranges_length(elongate_annotations(ann))
yield seq_id, ann_strand, covered / seq_len * 100
[docs]def group_annotations(annotations, key_func=lambda x: (x.seq_id, x.strand)):
"""
.. versionadded:: 0.1.12
Group :class:`Annotation` instances in a dictionary by using a key function
that returns the key to be used in the dictionary.
Arguments:
annotations (iterable): iterable with :class:`Annotation` instances
key_func (func): function used to extract the key used in the
dictionary, defaults to a function that returns
(ann.seq_id, ann.strand)
Returns:
dict: dictionary whose keys are returned by *key_func* and the values
are lists of annotations
Example:
>>> ann = [Annotation(seq_id='seq1', strand='+', start=10, end=15),
... Annotation(seq_id='seq1', strand='+', start=1, end=5),
... Annotation(seq_id='seq1', strand='-', start=30, end=100)]
>>> group_annotations(ann)
{('seq1', '+'): [seq1(+):10-15, seq1(+):1-5], ('seq1', '-'): [seq1(-):30-100]}
"""
grouped = {}
for annotation in annotations:
key = key_func(annotation)
try:
grouped[key].append(annotation)
except KeyError:
grouped[key] = [annotation]
return grouped
[docs]def group_annotations_sorted(annotations,
key_func=lambda x: (x.seq_id, x.strand)):
"""
.. versionadded:: 0.1.13
Group :class:`Annotation` instances by using a key function that returns a
key. Assumes that the annotations are already sorted to return an iterator
and save memory. One way to sort them is using: `sort -s -k 1,1 -k 7,7` on
the file.
Arguments:
annotations (iterable): iterable with :class:`Annotation` instances
key_func (func): function used to extract the key used in the
dictionary, defaults to a function that returns
(ann.seq_id, ann.strand)
Yields:
list: a list of the grouped annotations by *key_func* values
"""
curr_key = ''
curr_ann = []
for annotation in annotations:
new_key = key_func(annotation)
if curr_key == new_key:
curr_ann.append(annotation)
else:
if curr_key == '':
curr_ann.append(annotation)
curr_key = new_key
else:
yield curr_ann
curr_key = new_key
curr_ann = [annotation]
else:
yield curr_ann
[docs]def group_annotations_by_ancestor(annotations, ancestors, taxonomy):
"""
.. versionadded:: 0.1.13
Group annotations by the ancestors provided.
Arguments:
annotations (iterable): annotations to group
ancestors (iterable): list of ancestors accepted
taxonomy: taxonomy class
Returns:
dict: grouped annotations
"""
ann_dict = dict((ancestor, []) for ancestor in ancestors)
unknown = []
for annotation in annotations:
anc_found = False
for ancestor, anc_ids in viewitems(ancestors):
if taxonomy.is_ancestor(annotation.taxon_id, anc_ids):
ann_dict[ancestor].append(annotation)
anc_found = True
break
if not anc_found:
unknown.append(annotation)
return ann_dict, unknown
[docs]def split_gff_file(file_handle, name_mask, num_files=2, encoding='ascii'):
"""
.. versionadded:: 0.1.14
.. versionchanged:: 0.2.6
now accept a file object as sole input
.. versionchanged:: 0.4.0
added *encoding* parameter
Splits a GFF, or a list of them, into a number of files. It is assured that
annotations for the same sequence are kept in the same file, which is
useful for cases like filtering, even when the annotations are from
different GFF files.
Internally, a structure is kept to check if a sequence ID is already been
stored to a file, in which case the annotation is written to that file,
otherwise a random file handles (among the open ones) is chosen.
Arguments:
file_handle (str, list): a single or list of file handles (or file
names), from which the GFF annotations are read
name_mask (str): a string used as template for the output file names
on which the function applies :func:`string.format`
num_files (int): the number of files to split the records
Example:
>>> import glob
>>> files = glob.glob('*.gff')
>>> name_mask = 'split-file-{0}.gff'
>>> split_gff_file(files, name_mask, 5)
"""
if not isinstance(file_handle, IOBase):
if isinstance(file_handle, str):
file_handle = [file_handle]
file_handle = itertools.chain(
*(mgkit.io.open_file(x, 'rb') for x in file_handle)
)
out_handles = [
mgkit.io.open_file(name_mask.format(filen), 'wb')
for filen in range(num_files)
]
seq_ids = {}
for line in file_handle:
line = line.decode(encoding)
if line.startswith('#'):
continue
if line.startswith('>'):
break
seq_id = line.split('\t')[0]
try:
out_handle = out_handles[seq_ids[seq_id]]
except KeyError:
new_index = random.randint(0, num_files - 1)
seq_ids[seq_id] = new_index
out_handle = out_handles[new_index]
out_handle.write(line.encode('ascii'))
[docs]def load_gff_base_info(files, taxonomy=None, exclude_ids=None,
include_taxa=None, encoding='ascii'):
"""
This function is useful if the number of annotations in a GFF is high or
there are memory constraints on the system. It returns a dictionary that
can be used with functions like
:func:`mgkit.counts.func.load_sample_counts`.
Arguments:
files (iterable, str): file name or list of paths of GFF files
taxonomy: taxonomy pickle file, needed if include_taxa is not None
exclude_ids (set, list): a list of gene_id to exclude from the
dictionary
include_taxa (int, iterable): a taxon_id or list thereof to be passed
to :meth:`mgkit.taxon.taxonomy.is_ancestor`, so only the taxa that
have the those taxon_id(s) as ancestor(s) are kept
encoding (str): passed to :func:`parse_gff`
Returns:
dict: dictionary where the key is :attr:`Annotation.uid` and the value
is a tuple (:attr:`Annotation.gene_id`, :attr:`Annotation.taxon_id`)
"""
if isinstance(files, str):
files = [files]
infos = {}
for fname in files:
for annotation in parse_gff(fname, encoding=encoding):
# no information on taxa - exclude
if annotation.taxon_id is None:
continue
# to exclude ribosomial genes or any other kind
if exclude_ids is not None:
if annotation.gene_id in exclude_ids:
continue
if (include_taxa is not None) and (taxonomy is not None):
if not taxonomy.is_ancestor(annotation.taxon_id, include_taxa):
continue
infos[annotation.uid] = (annotation.gene_id, annotation.taxon_id)
return infos
[docs]def load_gff_mappings(files, map_db, taxonomy=None, exclude_ids=None,
include_taxa=None, encoding='ascii'):
"""
This function is useful if the number of annotations in a GFF is high or
there are memory constraints on the system. It returns a dictionary that
can be used with functions like
:func:`mgkit.counts.func.load_sample_counts`.
Arguments:
files (iterable, str): file name or list of paths of GFF files
map_db (str): any kind mapping in the GFF, as passed to
:meth:`Annotation.get_mapping`
taxonomy: taxonomy pickle file, needed if include_taxa is not None
exclude_ids (set, list): a list of gene_id to exclude from the
dictionary
include_taxa (int, iterable): a taxon_id or list thereof to be passed
to :meth:`mgkit.taxon.taxonomy.is_ancestor`, so only the taxa that
have the those taxon_id(s) as ancestor(s) are kept
encoding (str): passed to :func:`parse_gff`
Returns:
dict: dictionary where the key is :attr:`Annotation.gene_id` and the
value is a list of mappings, as returned by
:meth:`Annotation.get_mapping`
"""
infos = {}
for fname in files:
for annotation in parse_gff(fname, encoding=encoding):
# skips genes that are already in the mapping
if annotation.gene_id in infos:
continue
# exclude genes with no taxonomic information
if annotation.taxon_id is None:
continue
if exclude_ids is not None:
if annotation.gene_id in exclude_ids:
continue
# skips non bacterial/achaeal genes
if (include_taxa is not None) and (taxonomy is not None):
if not taxonomy.is_ancestor(annotation.taxon_id, include_taxa):
continue
infos[annotation.gene_id] = annotation.get_mapping(map_db)
return infos
[docs]def parse_gff_files(files, strict=True):
"""
.. versionadded:: 0.1.15
.. versionchanged:: 0.2.6
added *strict* parameter
Function that returns an iterator of annotations from multiple GFF files.
Arguments:
files (iterable, str): iterable of file names of GFF files, or a single
file name
strict (bool): if True duplicate keys raise an exception
Yields:
:class:`Annotation`: iterator of annotations
"""
if isinstance(files, str):
files = [files]
return itertools.chain(*(parse_gff(file_name, strict=strict) for file_name in files))
[docs]def get_annotation_map(annotations, key_func, value_func):
"""
.. versionadded:: 0.1.15
Applies two functions to an iterable of annotations with an iterator
returned with the applied functions. Useful to build a dictionary
Arguments:
annotations (iterable): iterable of annotations
key_func (func): function that accept an annotation as argument and
returns one value, the first of the returned tuple
value_func (func): function that accept an annotation as argument and
returns one value, the second of the returned tuple
Yields:
tuple: a tuple where the first value is the result of *key_func* on
the passed annotation and the second is the value returned by
*value_func* on the same annotation
"""
for annotation in annotations:
yield key_func(annotation), value_func(annotation)
[docs]def convert_gff_to_gtf(file_in, file_out, gene_id_attr='uid'):
"""
.. versionadded:: 0.1.16
Function that uses :meth:`Annotation.to_gtf` to convert a GFF into GTF.
Arguments:
file_in (str, file): either file name or file handle of a GFF file
file_out (str): file name to which write the converted annotations
"""
LOG.info("Writing GTF file to %s", file_out)
file_out = open(file_out, 'w')
for annotation in parse_gff(file_in):
file_out.write(annotation.to_gtf())
[docs]def from_mongodb(record, lineage=True):
"""
.. versionadded:: 0.2.1
.. versionchanged:: 0.2.2
added handling of *counts_* and *fpkms_*
.. versionchanged:: 0.2.6
better handling of missing attributes and added *lineage* parameter
Returns a :class:`Annotation` instance from a MongoDB record (created)
using :meth:`Annotation.to_mongodb`. The actual record returned by pymongo
is a dictionary that is copied, manipulated and passed to the
:meth:`Annotation.__init__`.
Arguments:
record (dict): a dictionary with the full record from a MongoDB query
lineage (bool): indicates if the lineage information in the record
should be kept in the annotation
Returns:
Annotation: instance of :class:`Annotation` object
"""
record = record.copy()
record['uid']
del record['_id']
if 'map' in record:
mappings = record['map'].copy()
del record['map']
try:
record['EC'] = ','.join(mappings['ec'])
del mappings['ec']
except KeyError:
pass
try:
for key in mappings:
record['map_{}'.format(key.upper())] = ','.join(mappings[key])
except KeyError:
pass
try:
counts = record['counts'].copy()
del record['counts']
for key in counts:
record['counts_{}'.format(key)] = counts[key]
except KeyError:
pass
try:
fpkms = record['fpkms'].copy()
del record['fpkms']
for key in fpkms:
record['fpkms_{}'.format(key)] = fpkms[key]
except KeyError:
pass
if ('lineage' in record) and (not lineage):
del record['lineage']
return Annotation(**record)
[docs]def from_prodigal_frag(main_gff, blast_gff, attr='ID', split_func=None):
"""
.. versionchanged:: 0.3.3
fixed a bug for the strand, also the code is tested
.. versionadded:: 0.2.6
*experimental*
Reads the GFF given in output by PRODIGAL and the resulting GFF from using
BLAST (or other software) on the aa or nucleotide file output by PRODIGAL.
It then integrates the two outputs, so to the PRODIGAL GFF is added the
information from the the output of the gene prediction software used.
Arguments:
main_gff (file): GFF file from PRODIGAL
blast_gff (file): GFF with the returned annotations
attr (str): attribute in the PRODIGAL GFF that is used to identify an
annotation
split_func (func): function to rename the headers from the predicted
sequences back to their parent sequence
Yields:
annotation: annotation for each *blast_gff* back translated
"""
if split_func is None:
def split_func(x): return tuple(x.rsplit('_', 1))
prodigal_gff = {}
for annotation in parse_gff(main_gff, strict=False):
key = (
annotation.seq_id,
split_func(annotation.get_attr(attr))[1]
)
prodigal_gff[key] = (
annotation.start,
annotation.end,
annotation.strand,
annotation.get_attr(attr)
)
for annotation in parse_gff(blast_gff):
key = split_func(annotation.seq_id)
annotation.set_attr('prodigal_start', annotation.start)
annotation.set_attr('prodigal_end', annotation.end)
annotation.set_attr('prodigal_strand', annotation.strand)
start, end, strand, p_id = prodigal_gff[key]
annotation.seq_id = key[0]
annotation.start = start
annotation.end = end
annotation.strand = strand
annotation.set_attr('prodigal_ID', p_id)
yield annotation