Module containing functions related to sequence data
.. note::
For those functions without a docstring, look at the same with a
underscore ('_') prepended.
from __future__ import division # add check to use only on python 2.x
from builtins import range, zip
from future.utils import viewitems, viewvalues
import itertools
import logging
import random
import collections
import numpy
import pandas
from scipy import stats
from scipy import interpolate
import statsmodels.api as sm
from ..utils.common import between
from .trans_tables import UNIVERSAL
from ..io import fasta
from ._sequence import get_kmers, sliding_window, sequence_signature, \
LOG = logging.getLogger(__name__)
"""Translation table - Universal genetic code"""
'A': 'T',
'T': 'A',
'C': 'G',
'G': 'C'
"Dictionary containing the complement of each nucleotide sequence"
[docs]def make_reverse_table(tbl=None):
Makes table to reverse complement a sequence by :func:`reverse_complement`.
The table used is the complement for each nucleotide, defaulting to
if tbl is None:
tbl = REV_COMP
trans_table = [chr(idx) for idx in range(256)]
for nuc, rev in viewitems(tbl):
trans_table[ord(nuc)] = rev
return ''.join(trans_table)
# The maketrans function is not available on Python3
REV_COMP_ASCII = u''.maketrans(REV_COMP)
[docs]def reverse_complement_old(seq, tbl=None):
Returns the reverse complement of a nucleotide sequence
:param str seq: nucleotide sequence with uppercase characters
:param dict tbl: dictionary of complement bases, like :data:`REV_COMP`
:return str: returns the reverse complement of a nucleotide sequence
if tbl is None:
tbl = REV_COMP
rev_complement = [tbl[x] for x in seq[::-1]]
return ''.join(rev_complement)
[docs]def reverse_complement(seq, tbl=REV_COMP_ASCII):
Returns the reverse complement of a nucleotide sequence
:param str seq: nucleotide sequence with uppercase characters
:param dict tbl: translation table returned by :func:`make_reverse_table`
:return str: returns the reverse complement of a nucleotide sequence
return seq[::-1].translate(tbl)
[docs]def translate_sequence(sequence, start=0, tbl=None, reverse=False):
Translate a nucleotide sequence in an amino acid one.
:param str sequence: sequence to translate, it's expected to be all caps
:param int start: 0-based index for the translation to start
:param dict tbl: dictionary with the translation for each codon
:param bool reverse: if True, :func:`reverse_complement` will be called and
the returned sequence translated
:return str: the translated sequence
if reverse:
sequence = reverse_complement(sequence)
if tbl is None:
trs = []
for idx in range(start, len(sequence), 3):
codon = sequence[idx:idx+3]
if len(codon) < 3:
except KeyError:
return ''.join(trs)
[docs]def put_gaps_in_nuc_seq(nuc_seq, aa_seq, trim=True):
Match the gaps in an amino-acid aligned sequence to its original nucleotide
sequence. If the nucleotide sequence is not a multiple of 3, the trim
option by default trim those bases from the output.
:param str nuc_seq: original nucleotide sequence
:param str aa_seq: aligned amino-acid sequence
:param bool trim: if True trim last nucleotide(s)
:return str: gapped nucleotide sequence
# transform a string into a list of codons, then inserts the gaps
nuc_alg = [nuc_seq[idx:idx+3] for idx in range(0, len(nuc_seq), 3)]
for idx, aa_pos in enumerate(aa_seq):
if aa_pos == '-':
nuc_alg.insert(idx, '-' * 3)
nuc_alg = ''.join(nuc_alg)
if trim and (len(nuc_alg) != len(aa_seq) * 3):
trim_idx = len(nuc_seq) % 3
if trim_idx > 0:
nuc_alg = nuc_alg[:-trim_idx]
return nuc_alg
[docs]def get_syn_matrix(trans_table=None, nuc_list=None):
Returns a dictionary containing the expected count of synonymous and
non-synonymous changes that a codon can have if one base is allowed to
change at a time.
There are 9 possible changes per codon.
:param dict trans_table: a tranlation table, defaults to
:param iterable nuc_list: a list of nucleotides in which a base can change,
default to the keys of :data:`seq_utils.REV_COMP`
:return dict: returns a dictionary in which for each codon a dictionary
{'syn': 0, 'nonsyn': 0} holds the number of expected changes
if trans_table is None:
trans_table = TRANS_TABLE
if nuc_list is None:
nuc_list = REV_COMP
syn_matrix = dict(
(codon, {'syn': 0, 'nonsyn': 0})
for codon in trans_table
for codon1 in trans_table:
for frame in range(0, 3):
for nuc in nuc_list:
codon2 = list(codon1)
codon2[frame] = nuc
codon2 = ''.join(codon2)
# skips if its the same
if codon1 == codon2:
if trans_table[codon1] == trans_table[codon2]:
syn_matrix[codon1]['syn'] += 1
syn_matrix[codon1]['nonsyn'] += 1
return syn_matrix
[docs]def get_syn_matrix_all(trans_table=None):
Same as :func:`get_syn_matrix` but a codon can change in any of the ones
included in trans_table.
There are 63 possible changes per codon.
if trans_table is None:
trans_table = TRANS_TABLE
syn_matrix = dict(
(codon, {'syn': 0, 'nonsyn': 0})
for codon in trans_table
for codon1, codon2 in itertools.permutations(trans_table, 2):
if trans_table[codon1] == trans_table[codon2]:
syn_matrix[codon1]['syn'] += 1
syn_matrix[codon1]['nonsyn'] += 1
return syn_matrix
[docs]def get_seq_expected_syn_count(seq, start=0, syn_matrix=None):
.. versionchanged:: 0.4.4
if codon contains *N* or is not a multiple of 3, it is skipped
Calculate the expected number of synonymous and non-synonymous changes in a
nucleotide sequence. Assumes that the sequence is already in the correct
frame and its length is a multiple of 3.
:param iterable seq: nucleotide sequence (uppercase chars)
:param int start: frame of the sequence
:param dict syn_matrix: dictionary that contains the expected number of
changes for a codon, as returned by :func:`get_syn_matrix`
:return tuple: tuple with counts of expected counts (syn, nonsyn)
if syn_matrix is None:
syn_matrix = get_syn_matrix()
seq_syn = 0
seq_nonsyn = 0
for idx in range(start, len(seq), 3):
codon = seq[idx:idx+3]
# if an ambiguity *N* is found, skip the codon
if ('N' in codon) or (len(codon) != 3):
seq_syn += syn_matrix[codon]['syn']
seq_nonsyn += syn_matrix[codon]['nonsyn']
return seq_syn, seq_nonsyn
[docs]def get_variant_sequence(seq, *snps):
.. versionadded:: 0.1.16
Return a sequence changed in the positions requested.
seq (str): a sequence
*snps (tuple): each argument passed is a tuple with the first element
as a position in the sequence (1-based index) and the second
element is the character to substitute in the sequence
str: string with the changed characters
>>> get_variant_sequence('ACTGATATATGCGCGCATCT', (1, 'C'))
.. note::
It is used for nucleotide sequences, but it is valid to use any string
seq = list(seq)
for pos, change in snps:
seq[pos - 1] = change
return ''.join(seq)
[docs]def convert_aa_to_nuc_coord(start, end, frame=0):
Converts aa coordinates to nucleotidic ones. The coordinates must be from
'+' strand. For the '-' strand, use :func:`reverse_aa_coord` first.
start (int): start of the annotation (lowest number)
end (int): end of the annotation (highest number)
frame (int): frame of the AA translation (0, 1 or 2)
tuple: the first element is the converted *start* and the second
element is the converted *end*
.. note::
the coordinates are assumed to be 1-based indices
start = (start - 1) * 3 + 1 # gets the first base of the codon
end = (end - 1) * 3 + 3 # gets the third base of the codon
return (start + frame, end + frame)
[docs]def reverse_aa_coord(start, end, seq_len):
Used to reverse amino-acid coordinates when parsing an AA annotation on
the - strand. Used when the BLAST or HMMER annotations use AA sequences.
start (int): start of the annotation
end (int): end of the annotation
seq_len (int): aa sequence length
tuple: reversed (from strand - to strand +) coordinates. The first
element is the converted *start* and the second element is the
converted *end*
.. note::
* start and end are 1-based indices
return (seq_len - end + 1, seq_len - start + 1)
[docs]def calc_n50(seq_lengths):
Calculate the N50 statistics for a :class:`numpy.array` of sequence
The algorithm finds in the supplied array the element (contig length) for
which the sum all contig lengths equal or greater than it is equal to half
of all assembled base pairs.
:param array seq_lengths: an instance of a numpy array containing the
sequence lengths
:return int: the N50 statistics value
LOG.info("Calculating N50 for assembly")
bp_half = seq_lengths.sum() / 2.0
bp_sum = 0
for seq_length in seq_lengths[::-1]:
bp_sum += seq_length
if bp_sum > bp_half:
return seq_length
[docs]class Alignment(object):
"Simple alignment class"
_names = None
_seqs = None
def __init__(self, seqs=None):
self._names = []
self._seqs = []
if seqs:
[docs] def add_seq(self, name, seq):
Add a sequence to the alignment
:param str name: name of the sequence
:param str seq: sequence
[docs] def add_seqs(self, seqs):
Add sequences to the alignment
:param iterable seqs: iterable that returns (name, seq)
for name, seq in seqs:
self.add_seq(name, seq)
[docs] def get_position(self, pos):
Get all characters at a position
:param int pos: position to return (0-based)
:return str: all characters occuring at the position
return ''.join(seq[pos] for seq in self._seqs)
[docs] def get_seq_len(self):
"Get the length of the alignment"
return len(self._seqs[0])
[docs] def get_consensus(self, nucl=True):
.. versionchanged:: 0.1.16
added *nucl* parameter
The consensus sequence is constructed by checking the nucleotide that
has the maximum number of counts for each position in the alignment.
nucl (bool): specify if the alignment is nucleotidic
str: consensus sequence
cons_seq = []
nucs = REV_COMP.keys()
for pos in range(0, self.get_seq_len()):
site = self.get_position(pos)
if nucl:
nuc_pos = max(
((site.count(nuc), nuc) for nuc in nucs),
key=lambda x: x[0]
nuc_pos = max(collections.Counter(x for x in site if x != '-'))
return ''.join(cons_seq)
def __iter__(self):
for name, seq in zip(self._names, self._seqs):
yield name, seq
[docs] def get_snps(self, ref_seq=None, full_size=False):
A SNP is called for the nucleotide that has the most counts among the
ones that differ in the each site of the alignment. If two nucleotides
have the same maximum count, one is randomly chosen.
:param str ref_seq: a reference sequence can be provided, if None, a
consensus sequence is produced for the alignment
:param bool full_size: if True a tuple is returned for each position in
the alignment. If there is no SNP at a position the value for the
SNP is None
:return list: a list of tuples (position, SNP)
if ref_seq is None:
ref_seq = self.get_consensus()
nucs = REV_COMP.keys()
snps = []
for pos in range(0, self.get_seq_len()):
ref_nuc = ref_seq[pos]
alts = self.get_position(pos).replace(ref_nuc, '').replace('-', '')
if not alts:
if full_size:
snps.append((pos, None))
alts = sorted(
((alts.count(nuc), nuc) for nuc in nucs),
key=lambda x: x[0]
counts = [alt[0] for alt in alts]
count_idx = counts.index(counts[-1])
alt = random.choice(alts[count_idx:])
snps.append((pos, alt[1]))
return snps
[docs]def get_seq_number_of_syn(ref_seq, snps, start=0, trans_table=None):
Given a reference sequence and a list of SNPs, calculates the number of
synonymous and non-synonymous SNP.
:param str ref_seq: reference sequence
:param iterable snps: list of tuples (position, SNP) - zero based index
:param int start: the frame used for the reference {0, 1, 2}
:param dict trans_table: translation table used - codon->AA
:return tuple: synonymous and non-synonymous counts
if trans_table is None:
trans_table = TRANS_TABLE
syn = 0
nonsyn = 0
for idx in range(start, len(ref_seq), 3):
ref_codon = ref_seq[idx:idx+3]
end = idx + 2
for pos, change in snps:
if not between(pos, idx, end):
change_pos = pos-idx
codon = ref_codon[:change_pos] + change + ref_codon[change_pos+1:]
if len(codon) < 3:
if trans_table[codon] == trans_table[ref_codon]:
syn += 1
nonsyn += 1
return syn, nonsyn
[docs]def check_snp_in_seq(ref_seq, pos, change, start=0, trans_table=None):
Check a SNP in a reference sequence if it is a synonymous or non-synonymous
:param str ref_seq: reference sequence
:param int pos: SNP position - it is expected to be a 1 based index
:param str change: nucleotide change occuring at *pos*
:param int start: the starting position for the coding region - 0 based
:param dict trans_table: translation table used - codon->AA
:return bool: True if it is a synonymous change, False if non-synonymous
# pos is expected to be a 1 based index
# returns true if snp is synonymous
# start is expected to be 0 based
if trans_table is None:
trans_table = TRANS_TABLE
codon_idx = ((pos - start) // 3) - (1 if (pos - start) % 3 == 0 else 0)
codon_pos = codon_idx * 3 + start
ref_codon = ref_seq[codon_pos:codon_pos + 3]
snp_codon = list(ref_codon)
snp_codon[(pos % 3) - 1] = change
snp_codon = ''.join(snp_codon)
return trans_table[snp_codon] == trans_table[ref_codon]
[docs]def sequence_gc_ratio(sequence):
.. versionadded:: 0.1.13
Calculate GC ratio information for a sequence. The formula is:
.. math::
:label: gc_ratio
\\frac {(A + T)}{(G + C)}
sequence (str): sequence
float: GC ratio, or `numpy.nan` if G = C = 0
at_sum = (sequence.count('A') + sequence.count('T'))
gc_sum = (sequence.count('G') + sequence.count('C'))
return at_sum / gc_sum
except ZeroDivisionError:
return numpy.nan
[docs]def sequence_gc_content(sequence):
.. versionchanged:: 0.3.3
in case of `ZeroDivisionError` returns .5
.. versionadded:: 0.1.13
Calculate GC content information for an annotation. The formula is:
.. math::
:label: gc_content
\\frac {(G + C)}{(G + C + A + T)}
sequence (str): sequence
float: GC content
at_sum = (sequence.count('A') + sequence.count('T'))
gc_sum = (sequence.count('G') + sequence.count('C'))
return gc_sum / (gc_sum + at_sum)
except ZeroDivisionError:
return .5
[docs]def sequence_composition(sequence, chars=tuple(REV_COMP)):
.. versionadded:: 0.1.13
Returns the number of occurences of each unique character in the sequence
sequence (str): sequence
chars (iterable, None): iterable of the chars to test, default to
(A, C, T, G). if None checks all unique characters in the sequencce
tuple: the first element is the nucleotide and the second is the number
of occurences in *sequence*
if chars is None:
chars = set(sequence)
for nuc in chars:
yield nuc, sequence.count(nuc)
[docs]def get_contigs_info(file_name, pp=False):
.. versionchanged:: 0.2.4
file_name can be a *dict* name->seq or a list of sequences
.. versionadded:: 0.2.1
Given a file name for a fasta file with sequences, a dictionary of
name->seq, or a list of sequences, returns the following information in a
tuple, or a string if *pp* is True:
- number of sequences
- total base pairs
- max length
- min length
- average length
- N50 statistic
file_name (str): fasta file to open
pp (bool): if True, a formatted string is returned
str, tuple: the returned value depends on the value of *pp*, if True a
formatted string is returned, otherwise the tuple with all values is.
if isinstance(file_name, dict):
seqs = list(viewvalues(file_name))
file_name = 'dictionary'
elif isinstance(file_name, list):
seqs = list(file_name)
file_name = 'list'
seqs = list(seq for name, seq in fasta.load_fasta(file_name))
lengths = numpy.array([len(x) for x in seqs])
info = (
if pp:
info_str = '{}: {} contigs, {} bp, max {}, min {}, avg {:.2f}, n50 {}'
info = info_str.format(
file_name, *info
return info
[docs]def _sliding_window(seq, size, step=None):
.. versionadded:: 0.2.6
Returns a generator, with every iteration yielding a subsequence of size
*size*, with a step of *step*.
seq (str): sequnece
size (int): size of the sliding window
step (int, None): the step to use in the sliding window. If *None*,
half of the sequence length is used
str: a subsequence of size *size* and step *step*
for index in range(0, len(seq) - size + 1, size // 2 if step is None else step):
yield seq[index:index+size]
[docs]def _get_kmers(seq, k):
.. versionadded:: 0.2.6
Returns a generator, with every iteration yielding a kmer of size *k*
seq (str): sequence
k (int): kmer size
str: a portion of *seq*, of size *k* with a step of *1*
for index in range(0, len(seq) - k + 1):
yield seq[index:index+k]
[docs]def _sequence_signature(seq, w_size, k_size=4, step=None):
.. versionadded:: 0.2.6
Returns the signature of a sequence, based on a kmer length, over a sliding
window. Each sliding window signature is placed in order into a list, with
each element being a :class:`collections.Counter` instance whose keys are
the kmer found in that window.
seq (str): sequence for which to get the signature
w_size (int): size of the sliding window size
k_size (int): size of the kmer to use :func:`get_kmers`
step (int): step to use in :func:`sliding_window`
list: a list of :class:`collections.Counter` instances, for each
window used
kmer_counts = []
for subseq in _sliding_window(seq, w_size, step):
collections.Counter(kmer for kmer in _get_kmers(subseq, k_size) if 'N' not in kmer)
return kmer_counts
[docs]def _signatures_matrix(seqs, w_size, k_size=4, step=None):
.. versionadded:: 0.2.6
Return a matrix (pandas.DataFrame) where the columns are the kmer found in
all sequences *seqs* and the rows are the a MultiIndex with the first level
being the sequnce name and the second the index of the sliding window for
which a signature was computed.
seqs (iterable): iterable that yields a tuple, with the first element
being the sequence name and the second the sequence itself
w_size (int): size of the sliding window size
k_size (int): size of the kmer to use :func:`get_kmers`
step (int): step to use in :func:`sliding_window`, defaults to half of
the window size
pandas.DataFrame: a DataFrame where the columns are the kmers and the
rows are the signatures of each contigs/windows.
def flatten_contigs(data):
for name, windows in viewitems(data):
for index, window in enumerate(windows):
yield (name, index), dict(window)
step = w_size // 2 if step is None else step
kmer_counts = {}
for name, seq in seqs:
sign = sequence_signature(seq, w_size, k_size, step=step)
kmer_counts[name] = sign
return pandas.DataFrame(
[docs]def random_sequences_codon(n=1, length=150, codons=list(UNIVERSAL.keys()),
p=None, frame=None):
.. versionadded:: 0.3.3
Returns an iterator of nucleotidic sequences, based on a defined genetic
code (passed as parameter, defaults to the universal one). The sequence if
first sampled with replacement from the codon list, with a number of codons
that covers the length chosen plus an additional one to allow a frame shift
as set by *frame*
.. note::
If the probability (for each codon) are supplied, the number of
sequences required to match those probabilities within a 10% margin of
error is of at least 10.000 sequences, for 5% at leas 100.000
n (int): number of sequences to yield
length (int): length of the sequences
codons (iterable): codons used when generating the sequences
p (tuple): probability of each codon occurence, in the same order as
frame (int or None): used to define a specific frame shift occuring in
the sequence (0 to 2) or a random one (if *None*)
str: string representing a nucleotidic sequence
sample_size = (length // 3) + (1 if length % 3 != 0 else 0) + 1
codons = numpy.array(codons)
if frame is not None:
if (frame < 0) or (frame > 2):
frame = 0
sframe = frame
for i in range(n):
if frame is None:
sframe = numpy.random.randint(0, 4)
yield ''.join(
numpy.random.choice(codons, size=sample_size, replace=True, p=p)
[docs]def random_sequences(n=1, length=150, p=None):
.. versionadded:: 0.3.3
Returns an iterator of random squences, where each nucleotide probability
can be customised in the order (A, C, T, G)
n (int): number of sequences to yield
length (int): length of each sequence
p (tuple): tuple with the probability of a nucleotide to occur, in the
order A, C, T, G
str: string representing a nucleotidic sequence
nucl = numpy.array(['A', 'C', 'T', 'G'])
for i in range(n):
yield ''.join(
numpy.random.choice(nucl, size=length, replace=True, p=p)
[docs]def qualities_model_decrease(length=150, scale=None, loc=35):
.. versionadded:: 0.3.3
The model is a decreasing one, from 35 and depends on the length of the
length (int): length of the qualities
scale (float): base level of the qualities
loc (float): loc parameter of the normal distribution
tuple: first element is sequence qualities, the second element contains
the distribution used to randomise them
if scale is None:
scale = 2. / (length / 150)
return loc - (numpy.log(numpy.arange(length) + 1) * (length / 150)), stats.norm.freeze(loc=0, scale=scale)
[docs]def qualities_model_constant(length=150, scale=1, loc=35):
.. versionadded:: 0.3.3
Model with constant quality
length (int): length of the qualities
scale (float): base level of the qualities
loc (float): loc parameter of the normal distribution
tuple: first element is sequence qualities, the second element contains
the distribution used to randomise them
return numpy.ones(length) * loc, stats.norm.freeze(scale=scale)
[docs]def random_qualities(n=1, length=150, model=None, max_qual=60, min_qual=0):
.. versionadded:: 0.3.3
.. versionchanged: 0.4.4
added *max_qual* and *min_qual* parameters, also the max quality value
output is 60 now
n (int): number of quality arrays to yield
length (int): length of the quality array
model (tuple): a tuple specifying the qualities and error distribution,
if *None* :func:`qualities_model_decrease` is used
max_qual (int): max quality values allowed, since rarely the quality
exceed 60 in sequencing data
numpy.array: numpy array of qualities, with the maximum value of 60
by default, or *max_qual*, same with *min_qual*
if model is None:
model = qualities_model_decrease(length=length)
base, dist = model
num_high_qual = 0
num_low_qual = 0
for x in range(n):
qual = numpy.round(base + dist.rvs(size=length)).astype(int)
if qual.max() > max_qual:
qual[qual > max_qual] = max_qual
num_high_qual += 1
if qual.min() < min_qual:
qual[qual < min_qual] = min_qual
num_low_qual += 1
yield qual
LOG.debug('Number of quality sequences over %d was %d out of %d', max_qual,
num_high_qual, n)
LOG.debug('Number of quality sequences below %d was %d out of %d',
min_qual, num_low_qual, n)