Source code for mgkit.workflow.count_utils

"""
Count Table Utilities

Map Count Table to Genes
************************

The `map` command can map information from map files to create count tables
from featureCounts where `uid` was used as attribute for the counts.

A taxonomy map can be passed if the taxonomy needs to be included in the
index of the output table. The format used for the table is `Parquet`, which
retains the Index/MultiIndex when read back with Pandas

Concatenate Parquet Files
*************************

Allows to concatenate several pandas dataframe with same indices. It's used
when the mapping file produce too big files and won't fit in memory.

So a solution is to split the map files, making multiple parquet files and
after that, concatenate them with this script.

Convert Parquet into CSV
************************

The command `to_csv` outputs a CSV file from a Parquet table.

Changes
*******

.. versionadded:: 0.5.7

"""

import logging
import click
from . import utils
from .. import logger
from tqdm import tqdm
import pandas
from mgkit.io import gff, open_file
from mgkit.utils.dictionary import text_to_dict


LOG = logging.getLogger(__name__)


@click.group()
@click.version_option()
@utils.cite_option
def main():
    "Main function"


[docs]def stream_feature_counts(count_file, sample_func=None, gene_ids=None): LOG.info("Reading featureCounts file %s", count_file) # comment at the start _ = count_file.readline() # header _, _, _, _, _, _, *sample_ids = count_file.readline().strip().split('\t') if sample_func is not None: sample_ids = [sample_func(sample_id) for sample_id in sample_ids] for line in count_file: if line.startswith('#'): continue gene_id, seq_id, start, end, strand, length, *counts = line.strip().split('\t') # skip rows not in the map if (gene_ids is not None) and (gene_id not in gene_ids): continue length = int(length) counts = pandas.Series((int(count) for count in counts), index=sample_ids, dtype=pandas.SparseDtype('int', 0)) yield gene_id, length, counts
@main.command('map', help="""Map counts with information a dictionary file""") @click.option('-v', '--verbose', is_flag=True) @click.option('-m', '--map-file', type=click.File('r'), required=True, help="Map file to use") @click.option('-t', '--taxa-map', type=click.File('r'), default=None, help="Taxa map file") @click.option('-s', '--separator', default="\t", show_default=True, type=click.STRING, help="Field separator for map file Key/Value") @click.option('-sv', '--split-value', default=False, show_default=True, is_flag=True, help="Values are string to be split") @click.argument('count_file', type=click.File('r', lazy=False), default='-') @click.argument('output_file', type=click.File('wb'), default='-') def map_counts(verbose, map_file, taxa_map, separator, split_value, count_file, output_file): """ .. versionadded: 0.5.7 Map counts from a key specified in the featureCounts file (first column) to another """ logger.config_log(level=logging.DEBUG if verbose else logging.INFO) if taxa_map is not None: taxa_data = {} LOG.info("Reading Taxa Map file %s", taxa_map) for line in taxa_map: key, taxon = line.strip().split(separator) taxa_data[key] = taxon LOG.info("Finished reading Taxa Map file") map_data = {} LOG.info("Reading Map file %s", map_file) for line in map_file: key, *values = line.strip().split(separator) if split_value: values = list(values[0]) map_data[key] = set(values) LOG.info("Finished reading Map file") iterator = stream_feature_counts(count_file, gene_ids=map_data) count_data = {} pbar = tqdm(desc="Mappings", total=len(map_data)) for key, length, count in iterator: # keys was found, so needs to update pbar.update(1) for map_id in map_data[key]: if taxa_map is not None: map_id = (map_id, taxa_data.get(key, None)) try: count_data[map_id] = count_data[map_id] + count except KeyError: count_data[map_id] = count # If all keys were found, break the loop if pbar.n == len(map_data): pbar.close() LOG.info("Used all the data from the Map File") break LOG.info("Building DataFrame") count_data = pandas.DataFrame.from_dict(count_data, orient='index').sort_index() count_data.to_parquet(output_file, engine='pyarrow') @main.command('cat', help="""Combine multiple count tables files""") @click.option('-v', '--verbose', is_flag=True) @click.option('-o', '--output', required=True, help="Output file", type=click.Path(file_okay=True, writable=True)) @click.argument('count_files', type=click.Path(file_okay=True, readable=True), nargs = -1) def cat_count_files(verbose, output, count_files): """ .. versionadded: 0.5.7 Concatenate multiple count tables - use when there are memory constrains in bigger count tables """ logger.config_log(level=logging.DEBUG if verbose else logging.INFO) dataframes = [] for count_file in count_files: LOG.info("Reading file %s", count_file) dataframes.append(pandas.read_parquet(count_file)) n_levels = dataframes[0].index.nlevels keys = list(range(len(dataframes))) dataframe = pandas.concat( dataframes, keys=keys, ).groupby(level=list(range(1, n_levels + 1))).sum() dataframe.to_parquet(output) @main.command('to_csv', help="Convert Parquet tables into CSV") @click.option('-v', '--verbose', is_flag=True) @click.argument('parquet_file', type=click.Path(file_okay=True, readable=True)) @click.argument('csv_file', type=click.File('w'), default='-') def to_csv_command(verbose, parquet_file, csv_file): """ .. versionadded: 0.5.7 Converts a Parquet table to CSV """ logger.config_log(level=logging.DEBUG if verbose else logging.INFO) dataframe = pandas.read_parquet(parquet_file) dataframe.to_csv(csv_file)