"""
.. versionadded:: 0.3.0
Scripts that includes some functionality to help use FASTA files with the
framework
split command
*************
Used to split a fasta file into smaller fragments
translate command
*****************
Used to translate nucleotide sequences into amino acids.
uid command
***********
Used to change a FASTA file headers to a unique ID. A table (tab separated)
with the changes made can be kept, using the *--table* option.
filter
******
Used to filter a FASTA file by length and also for sequence/header if a pattern is contained.
A list of headers to keep can be passed using the `-f` option.
info
****
Gets information about a FASTA file, prints seq_id (trimmed at first space), length
and hash (default sha1) and optionally the sequence, GC content and in GFF format if wanted.
rename
******
Renames the headers of a FASTA file, appending a random suffix and an optional prefix
Changes
*******
.. versionadded:: 0.3.0
.. versionchanged:: 0.3.1
added *translate* and *uid* command
.. versionchanged:: 0.3.4
ported to *click*
.. versionchanged:: 0.5.5
added option `-1` to output only the forward/frame0 and `-w` to avoid wrap at 60 chars to the *translate* command
.. versionchanged:: 0.5.7
added `filter` and `info` commands for simple fasta file filtering and info
"""
from __future__ import division
from builtins import range
import logging
from uuid import uuid4
import click
import hashlib
import pathlib
import string
import random
from tqdm import tqdm
import mgkit
from . import utils
from mgkit.io import fasta, gff
from ..utils import trans_tables
from ..utils.sequence import translate_sequence, sequence_gc_content
LOG = logging.getLogger(__name__)
@click.group()
@click.version_option()
@utils.cite_option
def main():
"Main function"
pass
@main.command('split', help="""Splits a FASTA file [fasta-file] in a number of
fragments""")
@click.option('-v', '--verbose', is_flag=True)
@click.option('-p', '--prefix', default='split', show_default=True,
help='Prefix for the file name in output')
@click.option('-n', '--number', type=click.INT, default=10, show_default=True,
help='Number of chunks into which split the FASTA file')
@click.option('-z', '--gzip', is_flag=True, default=False,
help='gzip output files')
@click.argument('fasta-file', type=click.File('rb'), default='-')
def split_command(verbose, prefix, number, gzip, fasta_file):
mgkit.logger.config_log(level=logging.DEBUG if verbose else logging.INFO)
LOG.info(
"Splitting FASTA into %d chunks with prefix (%s)",
number,
prefix
)
name_mask = "%s-{0:05}.fa" % prefix
if gzip:
name_mask += '.gz'
LOG.info("Output files will be compressed (gzip)")
fasta.split_fasta_file(
fasta_file,
name_mask,
num_files=number
)
[docs]def load_trans_table(table_name):
"Loads translation table "
return getattr(trans_tables, table_name.upper())
[docs]def translate_seq(name, seq, trans_table):
"Tranlates sequence into the 6 frames"
header = "{0}-{1}{2}"
for start in range(3):
yield header.format(name, 'f', start), translate_sequence(seq, start, trans_table, False)
yield header.format(name, 'r', start), translate_sequence(seq, start, trans_table, True)
@main.command('translate', help="""Translate FASTA file [fasta-file] in all 6
frames to [output-file]""")
@click.option('-v', '--verbose', is_flag=True)
@click.option('-t', '--trans-table', default='universal', show_default=True,
type=click.Choice([table_name.lower() for table_name in dir(trans_tables) if not table_name.startswith('_')]),
help='translation table')
@click.option('-1', '--one-seq', default=False, is_flag=True, show_default=True,
help='Only translate the sequence, instead of all 6 frames')
@click.option('-w', '--no-wrap', default=False, is_flag=True, show_default=True,
help='Make a sequence use only 1 line (2 including header)')
@click.option('--progress', default=False, is_flag=True,
help="Shows Progress Bar")
@click.argument('fasta-file', type=click.File('rb'), default='-')
@click.argument('output-file', type=click.File('wb'), default='-')
def translate_command(verbose, trans_table, one_seq, no_wrap, progress, fasta_file, output_file):
mgkit.logger.config_log(level=logging.DEBUG if verbose else logging.INFO)
if one_seq:
LOG.info("Assuming the sequences are in the correct frame")
LOG.info(
'Writing to file (%s)',
getattr(output_file, 'name', repr(output_file))
)
trans_table = load_trans_table(trans_table)
iterator = fasta.load_fasta(fasta_file)
if progress:
iterator = tqdm(iterator)
if no_wrap:
wrap = None
else:
wrap = 60
for name, seq in iterator:
if one_seq:
new_seq = translate_sequence(seq, 0, trans_table, False)
fasta.write_fasta_sequence(output_file, name, new_seq, wrap=wrap)
else:
for new_header, new_seq in translate_seq(name, seq, trans_table):
fasta.write_fasta_sequence(output_file, new_header, new_seq, wrap=wrap)
@main.command('uid', help="""Changes each header of a FASTA file [file-file] to
a uid (unique ID)""")
@click.option('-v', '--verbose', is_flag=True)
@click.option('-t', '--table', default=None, type=click.File('wb'),
help='Filename of a table to record the changes (by default discards it)')
@click.argument('fasta-file', type=click.File('rb'), default='-')
@click.argument('output-file', type=click.File('wb'), default='-')
def uid_command(verbose, table, fasta_file, output_file):
mgkit.logger.config_log(level=logging.DEBUG if verbose else logging.INFO)
if table is not None:
LOG.info(
'Writing Table to file (%s)',
getattr(table, 'name', repr(table))
)
LOG.info(
'Writing to file (%s)',
getattr(output_file, 'name', repr(output_file))
)
for name, seq in fasta.load_fasta(fasta_file):
uid = str(uuid4())
if table is not None:
table.write("{}\t{}\n".format(uid, name).encode('ascii'))
fasta.write_fasta_sequence(output_file, uid, seq)
@main.command('filter', help="""Filters a FASTA file [file-file]""")
@click.option('-v', '--verbose', is_flag=True)
@click.option('--len-gt', default=None, type=click.IntRange(min=1), help='Keeps sequences whose length is greater than')
@click.option('--len-lt', default=None, type=click.IntRange(min=1), help='Keeps sequences whose length is less than')
@click.option('--header-contains', default=None, type=click.STRING, help='Keeps sequences whose header contains the string')
@click.option('--seq-pattern', default=None, type=click.STRING, help='Keeps sequences that contains the string')
@click.option('-f', '--header-file', default=None, type=click.File('r'),
help='Keep only sequences contained in file list')
@click.option('-w', '--wrap', default=False, is_flag=True, help='Wraps the output sequences to 60 characters')
@click.option('-s', '--trim-tail', default=False, is_flag=True,
help='Removes header information after first space')
@click.argument('fasta-file', type=click.File('rb'), default='-')
@click.argument('output-file', type=click.File('wb'), default='-')
def filter_command(verbose, len_gt, len_lt, header_contains, seq_pattern, header_file, wrap,
trim_tail, fasta_file, output_file):
"""
.. versionadded:: 0.5.7
Filters a fasta file
"""
mgkit.logger.config_log(level=logging.DEBUG if verbose else logging.INFO)
if wrap:
wrap = 60
else:
wrap = None
if header_file is not None:
LOG.info('Reading header list from file %r', header_file)
header_file = set(
line.strip()
for line in header_file
)
count = 0
for name, seq in fasta.load_fasta(fasta_file):
seq_len = len(seq)
if len_gt is not None:
if not (seq_len > len_gt):
continue
if len_lt is not None:
if not (seq_len < len_lt):
continue
if trim_tail:
name = name.split(' ')[0]
if header_file is not None:
if name not in header_file:
continue
if header_contains is not None:
if header_contains not in name:
continue
if seq_pattern is not None:
if seq_pattern not in seq:
continue
fasta.write_fasta_sequence(output_file, name, seq, wrap=wrap)
count += 1
LOG.info('Kept %d sequence(s)', count)
@main.command('info', help="""Gets information of FASTA file [file-file]""")
@click.option('-v', '--verbose', is_flag=True)
@click.option('-h', '--header', is_flag=True, default=False,
help="Prints header")
@click.option('-s', '--include-seq', is_flag=True, default=False,
help="Includes the sequence")
@click.option('-r', '--no-rename', is_flag=True, default=False,
help="Do not split sequence name at first space")
@click.option('-a', '--hash-type', type=click.Choice(['sha1', 'md5', 'sha256']),
default='sha1', show_default=True)
@click.option('-g', '--out-gff', is_flag=True, default=False, show_default=True,
help='Outputs a GFF file')
@click.option('-gc', '--gc-content', is_flag=True, default=False, show_default=True,
help='Includes the GC Content')
@click.argument('fasta-file', type=click.File('rb'), default='-')
@click.argument('output-file', type=click.File('w'), default='-')
def info_command(verbose, header, include_seq, no_rename, hash_type, out_gff, gc_content, fasta_file, output_file):
"""
.. versionadded:: 0.5.7
Makes a tabular format file with hash, length and optionally the sequence. Alternatively, a GFF
can be the output.
"""
mgkit.logger.config_log(level=logging.DEBUG if verbose else logging.INFO)
hash_func = getattr(hashlib, hash_type)
if no_rename:
LOG.info("Sequence names will not be split at the first space")
load_func = fasta.load_fasta
else:
load_func = fasta.load_fasta_rename
if header and (not out_gff):
columns = ["seq_id", "length", "hash"]
if include_seq:
columns.append('sequence')
if gc_content:
columns.append('gc_content')
print(*columns, sep='\t', file=output_file)
for name, seq in load_func(fasta_file):
seq_hash = hash_func(seq.encode('ascii')).hexdigest()
if gc_content:
gc_value = sequence_gc_content(seq)
if out_gff:
annotation = gff.from_sequence(name, seq, seq_hash=seq_hash)
if gc_content:
annotation.set_attr("gc_content", gc_value)
output_file.write(annotation.to_gff())
else:
values = [name, len(seq), hash_func(seq.encode('ascii')).hexdigest()]
if include_seq:
values.append(seq)
if gc_content:
values.append(gc_value)
print(*values, sep='\t', file=output_file)
@main.command('rename', help="""Rename Sequence headers of FASTA file [file-file]
Adds 2 possible elements to the sequence header, separated by a
character 1) a suffix (random string of characters) and 2) a prefix
(optional).
The character used as separator should be a '|' (default), '#' or other
character that is not truncated in other software (space is).
In fact, this script will truncate the header at the first space
""")
@click.option('-v', '--verbose', is_flag=True)
@click.option('-p', '--prefix', default=None, type=click.STRING,
help="Adds a prefix to the header")
@click.option('-f', '--file-name', is_flag=True, default=False,
help="Adds filename as prefix (Useful for adding the file name")
@click.option('-s', '--separator', default='|', type=click.STRING,
help="Separator for the elements of the new header")
@click.option('-l', '--suffix-len', default=5, type=click.IntRange(0, 20),
help="Number of random characters to use")
@click.argument('fasta-file', type=click.File('rb'), default='-')
@click.argument('output-file', type=click.File('wb'), default='-')
def rename_command(verbose, prefix, file_name, separator, suffix_len, fasta_file, output_file):
"""
.. versionadded:: 0.5.7
Renames headers of a fasta file
"""
mgkit.logger.config_log(level=logging.DEBUG if verbose else logging.INFO)
random_pop = string.ascii_letters + string.digits
if prefix is not None:
if ' ' in prefix:
utils.exit_script("Prefix contains spaces: `{}`".format(prefix), 1)
if file_name:
file_name = pathlib.Path(fasta_file.name).name
if file_name == '<stdin>':
utils.exit_script("Standard input used, cannot use `-f`", 2)
base_elements = []
if prefix is not None:
base_elements.append(prefix)
if file_name:
base_elements.append(file_name)
for name, seq in fasta.load_fasta_rename(fasta_file):
elements = base_elements + [name]
if suffix_len > 0:
elements.append(
''.join(random.sample(random_pop, k=suffix_len))
)
fasta.write_fasta_sequence(output_file, separator.join(elements), seq)