Source code for phe.utils.reader

'''
:Date: 3 June, 2016
:Author: Public Health England
'''
from collections import Counter, defaultdict

import vcf

from phe.utils import is_uncallable


[docs]class ParallelVCFReader(object): """Read multiple VCFs in parallel - one position at the time.""" def __init__(self, vcfs): """Instantiate ParallelVCFReader. Parameters ---------- vcfs: list List of path s to the VCF files to read. """ self._readers = { vcf_in: vcf.Reader(filename=vcf_in) for vcf_in in vcfs} self._records = {} self.update() def __iter__(self): return self.get_records() def _vote_best_ref(self, refs): """Pick the best reference by counting most frequent occurence.""" counts = 0 try: counts = max(Counter(refs).iteritems(), key=lambda x: x[1])[0] except ValueError: counts = 0 return counts
[docs] def get_samples(self): """Get the list of sample names from the VCFs.""" return [reader.samples[0] for reader in self._readers.itervalues()]
[docs] def update(self, ids=None): """Update records in the readers. If *ids* is **not** specified, then all readers are updated. Otherwise, only those with the same *id* are updated. Parameters ---------- ids: list, optional Optional list of IDs to update. If **None**, all will be updated. """ if ids is None: ids = self._readers.keys() for k in ids: try: self._records[k] = self._readers[k].next() self._records[k].__setattr__("is_uncallable", is_uncallable(self._records[k])) except StopIteration: self._records[k] = None
[docs] def get_records(self): """Generator of records, one position at the time. If a position has more than 1 record in any of the readers, a list of all records with that position is returned. The user can deal appropriately with the records list. Returns ------- chrom: str Chromosome pos: int Position records: dict Records in dictionary of lists for each sample. .. code-block:: python { "sample1": [record1, record2], "sample2": [record3] } """ chrom = None # Iterate while we have reader and they are not None. while self._readers and not all(r is None for r in self._records.itervalues()): # Find the 'best' - most frequent chromosome. if chrom is None: chrom = self._vote_best_ref([ record.CHROM for record in self._records.itervalues() if record]) # Find the minimum position for that chromosome. try: # Find the lowest record position in all readers. min_pos = min([ record.POS for record in self._records.itervalues() if record and record.CHROM == chrom]) except ValueError: # This happens when all records have different chrom. chrom = self._vote_best_ref([ record.CHROM for record in self._records.itervalues() if record]) continue # Only get the records for vcfs that are in lowest currect position. records = defaultdict(list) # Pull out all records for that position + chromosome from readers. for vcf_in, record in self._records.iteritems(): while record and record.CHROM == chrom and record.POS == min_pos: records[self._readers[vcf_in].samples[0]].append(record) self.update([vcf_in]) record = self._records[vcf_in] if not records: continue # Yield them to iterator. yield chrom, min_pos, records # Update the records only of the readers we used, i.e. lowest position. for vcf_in in self._readers: if self._records[vcf_in] and self._records[vcf_in].POS == min_pos and self._records[vcf_in].CHROM == chrom: self.update([vcf_in])