Source code for mgkit.utils.dictionary

"""
Dictionary utils

"""
from builtins import range
import logging
import pathlib
import collections
import json
from future.utils import viewitems, viewkeys
import numpy
from tqdm import tqdm

import pandas

LOG = logging.getLogger(__name__)

[docs]def merge_dictionaries(dicts): """ .. versionadded:: 0.3.1 Merges keys and values from a list/iterable of dictionaries. The resulting dictionary's values are converted into sets, with the assumption that the values are one of the following: float, str, int, bool """ merged = {} for d in dicts: for key, value in viewitems(d): if isinstance(value, (float, str, int, bool)): value = [value] try: merged[key].update(value) except KeyError: merged[key] = set(value) return merged
[docs]def combine_dict(keydict, valuedict): """ Combine two dictionaries when the values of keydict are iterables. The combined dictionary has the same keys as keydict and the its values are sets containing all the values associated to keydict values in valuedict. .. digraph:: keydict :alt: key1 -> [v1, v2, .., vN] "keydict" -> "key1" -> "[v1, v2, .., vN]"; .. digraph:: valuedict :alt: v1 -> [u1, u2, .., uN] v2 -> [t1, t2, .., tN] "valuedict" -> "v1" -> "[u1, u2, .., uN]"; "valuedict" -> "v2" -> "[t1, t2, .., tN]"; Resulting dictionary will be .. digraph:: combined :alt: key1->{u1, u2, .., uN} "combined" -> "key1" -> "{u1, u2, .., uN, t1, t2, .., tN}"; :param dict keydict: dictionary whose keys are the same as the returned dictionary :param dict valuedict: dictionary whose values are the same as the returned dictionary :return dict: combined dictionary """ comb_dict = dict((key, set()) for key in keydict) for key, values in keydict.items(): for value in values: try: comb_dict[key].update(valuedict[value]) except KeyError: # in case a value isn't in valuedict keys, silently pass pass return comb_dict
[docs]def combine_dict_one_value(keydict, valuedict): """ Combine two dictionaries by the value of the keydict is used as a key in valuedict and the resulting dictionary is composed of keydict keys and valuedict values. Same as :func:`comb_dict`, but each value in keydict is a single element that is key in valuedict. :param dict keydict: dictionary whose keys are the same as the returned dictionary :param dict valuedict: dictionary whose values are the same as the returned dictionary :return dict: combined dictionary """ comb_dict = {} for key, value in keydict.items(): comb_dict[key] = valuedict[value] return comb_dict
[docs]def reverse_mapping(map_dict): """ Given a dictionary in the form: key->[v1, v2, .., vN], returns a dictionary in the form: v1->[key1, key2, .., keyN] :param dict map_dict: dictionary to reverse :return dict: reversed dictionary """ rev_map = {} for key, value_ids in viewitems(map_dict): for value_id in value_ids: try: rev_map[value_id].add(key) except KeyError: rev_map[value_id] = set([key]) return rev_map
[docs]def find_id_in_dict(s_id, s_dict): """ Finds a value 's_id' in a dictionary in which the values are iterables. Returns a list of keys that contain the value. :param dict s_id: element to look for in the dictionary's values :param object d: dictionary to search in :return list: list of keys in which d was found """ f_list = [] for k_id, v_ids in s_dict.items(): if s_id in v_ids: f_list.append(k_id) return f_list
[docs]def split_dictionary_by_value(value_dict, threshold, aggr_func=numpy.median, key_filter=None): """ Splits a dictionary, whose values are iterables, based on a threshold: * one in which the result of aggr_func is lower than the threshold (first) * one in which the result of aggr_func is equal or greater than the threshold (second) :param dict valuedict: dictionary to be splitted :param number threshold: must be comparable to threshold :param func aggr_func: function used to aggregate the dictionary values :param iterable key_filter: if specified, only these key will be in the resulting dictionary :return: two dictionaries """ lower_dict = {} higher_dict = {} if key_filter is None: key_filter = viewkeys(value_dict) for key in key_filter: values = value_dict[key] if aggr_func(values) < threshold: lower_dict[key] = values else: higher_dict[key] = values return lower_dict, higher_dict
[docs]def apply_func_to_values(dictionary, func): """ .. versionadded:: 0.1.12 Assuming a dictionary whose values are iterables, *func* is applied to each element of the iterable, retuning a *set* of all transformed elements. Arguments: dictionary (dict): dictionary whose values are iterables func (func): function to apply to the dictionary values Returns: dict: dictionary with transformed values """ return dict( (key, set(func(value) for value in values)) for key, values in viewitems(dictionary) )
[docs]def filter_ratios_by_numbers(ratios, min_num): """ Returns from a dictionary only the items for which the length of the iterables that is the value of the item, is equal or greater of min_num. :param dict ratios: dictionary key->list :param int min_num: minimum number of elements in the value iterable :return dict: filtered dictionary """ return dict( (key, values) for key, values in viewitems(ratios) if len(values) >= min_num )
[docs]def filter_nan(ratios): """ Returns a dictionary with the NaN values taken out """ return dict( (key, [ratio for ratio in ratios[key] if not numpy.isnan(ratio)]) for key in ratios )
[docs]class cache_dict_file(object): """ .. versionadded:: 0.3.0 Used to cache the result of a function that yields a tuple (key and value). If the value is found in the internal dictionary (as the class behave), the correspondent value is returned, otherwise the iterator is advanced until the key is found. Example: >>> from mgkit.io.blast import parse_accession_taxa_table >>> i = parse_accession_taxa_table('nucl_gb.accession2taxid.gz', key=0) >>> d = cache_dict_file(i) >>> d['AH001684'] 4400 """ _iterator = None _dict = None def __init__(self, iterator, skip_lines=0): """ Arguments: iterator (iter): iterator used in building the dictionary skip_lines (int): how many iterations to skip at the start """ for index in range(skip_lines): next(iterator) self._iterator = iterator self._dict = {} def __getitem__(self, key): try: value = self._dict[key] except KeyError: while True: nkey, nvalue = self.next() if nkey == key: value = nvalue break return value
[docs] def next(self): try: key, value = next(self._iterator) except StopIteration: raise KeyError self._dict[key] = value return key, value
[docs]class HDFDict(object): """ .. versionchanged:: 0.3.3 added *cache* in __init__ .. versionadded:: 0.3.1 Used a table in a HDFStore (from pandas) as a dictionary. The table must be indexed to perform well. Read only. .. note:: the dictionary cannot be modified and exception:`ValueError` will be raised if the table is not in the file """ def __init__(self, file_name, table, cast=int, cache=True): self._hdf = pandas.HDFStore(file_name, mode='r') self._table = table self._cast = cast if self._table not in self._hdf: raise ValueError( "Table ({}) not found in file ({})".format( table, file_name ) ) self._hdf.close() if cache: self.__getitem__ = self._getitem_cache self._cache = {} else: self.__getitem__ = self._getitem_hdf def _getitem_cache(self, key): value = self._cache.get(key, None) if value is None: value = self._getitem_hdf(key) self._cache[key] = value return value def _getitem_hdf(self, key): df = self._hdf.select(self._table, 'index=key') if df.empty: raise KeyError('Key not found {}'.format(key)) return self._cast(df.values) __getitem__ = _getitem_hdf
[docs]def dict_to_text(stream, dictionary, header=None, comment=None, sep='\t'): """ .. versionadded:: 0.4.4 Writes the content of a dictionary to a stream (supports *write*), like io.StringIO or an opened file. Intended to be used only for dictionaries with key-value of type integer/strings, other data types are better served by more complex options, like JSON, etc. .. warning:: The file is expected to be opened in text mode ('r') Arguments: stream (file): stream to write to, to output a string, use io.StringIO dictionary (dict): dictionary to write header (iterable): a tuple/list to be used as header comment (str): a comment at the start of the file - '# ' will be prepended to the value passed. sep (str): column separator to use """ if comment is not None: stream.write('# {}\n'.format(comment)) if header is not None: stream.write('{}\n'.format(sep.join(header))) for key, value in dictionary.items(): stream.write(f'{key}{sep}{value}\n')
[docs]def text_to_dict(stream, skip_lines=0, sep='\t', key_index=0, value_index=1, key_func=str, value_func=str, encoding=None, skip_empty=False, skip_comment=None, verbose=False): """ .. versionadded:: 0.4.4 .. versionchanged:: 0.5.5 added *skip_comment* and *skip_empty* .. versionchanged::0.5.7 added *verbose* parameter Reads a dictionary form a table file, the passed file is assumed to be opened as text, not binary - in which case you need to pass the encoding (e.g. *ascii*). The file may have multiple columns, so the key and value columns can be chosen with *key_index* and *value_index*, respectively. Arguments: stream (file): stream that can be read as a file skip_lines (int): number of lines to skip at the start of the file sep (str): column separator to use key_index (int): zero-based column number of keys value_index (int): zero-based column number of values key_func (func): function to apply to the keys (defaults to *str*) value_func (func): function to apply to the values (defaults to *str*) encoding (None, str): if *None* is passed, the file is assumed to be opened in text mode, otherwise the encoding of the file must be passed skip_empty (bool): if True, an empty value will not be yielded skip_comment (None, str): if a value other than None is passed, lines starting with this parameter value will be skipped verbose (bool): if True logs informations about the file read Yields: tuple: the keys and values that can be passed to *dict* """ count = 0 for index, line in enumerate(stream): if skip_lines > 0: skip_lines -= 1 continue if encoding is not None: line = line.decode(encoding) line = line.rstrip('\n') # line empty, so skip if (not line) or ((skip_comment is not None) and \ line.startswith(skip_comment)): continue line = line.split(sep) key = key_func(line[key_index]) value = value_func(line[value_index]) if skip_empty and (not value): continue count += 1 yield key, value if verbose: LOG.info("Used %d lines out of %d", count, index + 1)