Package Bio :: Package SearchIO :: Package ExonerateIO :: Module exonerate_vulgar
[hide private]
[frames] | no frames]

Source Code for Module Bio.SearchIO.ExonerateIO.exonerate_vulgar

  1  # Copyright 2012 by Wibowo Arindrarto.  All rights reserved. 
  2  # This code is part of the Biopython distribution and governed by its 
  3  # license.  Please see the LICENSE file that should have been included 
  4  # as part of this package. 
  5   
  6  """Bio.SearchIO parser for Exonerate vulgar output format.""" 
  7   
  8  import re 
  9   
 10  from Bio._py3k import _as_bytes, _bytes_to_string 
 11   
 12  from _base import _BaseExonerateParser, _BaseExonerateIndexer, _STRAND_MAP 
 13   
 14   
 15  __all__ = ['ExonerateVulgarParser', 'ExonerateVulgarIndexer'] 
 16   
 17   
 18  # precompile regex 
 19  _RE_VULGAR = re.compile(r"""^vulgar:\s+ 
 20          (\S+)\s+(\d+)\s+(\d+)\s+([\+-\.])\s+  # query: ID, start, end, strand 
 21          (\S+)\s+(\d+)\s+(\d+)\s+([\+-\.])\s+  # hit: ID, start, end, strand 
 22          (\d+)(\s+.*)$                         # score, vulgar components 
 23          """, re.VERBOSE) 
 24   
 25  _RE_VCOMP = re.compile(r""" 
 26          \s+(\S+) # vulgar label (C/M: codon/match, G: gap, N: ner, 5/3: splice 
 27                   #               site, I: intron, S: split codon, F: frameshift) 
 28          \s+(\d+) # how many residues to advance in query sequence 
 29          \s+(\d+) # how many residues to advance in hit sequence 
 30          """, re.VERBOSE) 
 31   
 32   
33 -def parse_vulgar_comp(hsp, vulgar_comp):
34 """Parses the vulgar components present in the hsp dictionary.""" 35 # containers for block coordinates 36 qstarts, qends, hstarts, hends = \ 37 [hsp['query_start']], [], [hsp['hit_start']], [] 38 # containers for split codons 39 hsp['query_split_codons'], hsp['hit_split_codons'] = [], [] 40 # containers for ner blocks 41 hsp['query_ner_ranges'], hsp['hit_ner_ranges'] = [], [] 42 # sentinels for tracking query and hit positions 43 qpos, hpos = hsp['query_start'], hsp['hit_start'] 44 # multiplier for determining sentinel movement 45 qmove = 1 if hsp['query_strand'] >= 0 else -1 46 hmove = 1 if hsp['hit_strand'] >= 0 else -1 47 48 vcomps = re.findall(_RE_VCOMP, vulgar_comp) 49 for idx, match in enumerate(vcomps): 50 label, qstep, hstep = match[0], int(match[1]), int(match[2]) 51 # check for label, must be recognized 52 assert label in 'MCGF53INS', "Unexpected vulgar label: %r" % label 53 # match, codon, or gaps 54 if label in 'MCGS': 55 # if the previous comp is not an MCGS block, it's the 56 # start of a new block 57 if vcomps[idx-1][0] not in 'MCGS': 58 qstarts.append(qpos) 59 hstarts.append(hpos) 60 # other labels 61 # store the values in the hsp dict as a tuple of (start, stop) 62 # we're not doing anything if the label is in '53IN', as these 63 # basically tell us what the inter-block coordinates are and 64 # inter-block coordinates are automatically calculated by 65 # and HSP property 66 if label == 'S': 67 # get start and stop from parsed values 68 qstart, hstart = qpos, hpos 69 qend = qstart + qstep * qmove 70 hend = hstart + hstep * hmove 71 # adjust the start-stop ranges 72 sqstart, sqend = min(qstart, qend), max(qstart, qend) 73 shstart, shend = min(hstart, hend), max(hstart, hend) 74 # split codons 75 # XXX: is it possible to have a frameshift that introduces 76 # a codon split? If so, this may need a different treatment.. 77 hsp['query_split_codons'].append((sqstart, sqend)) 78 hsp['hit_split_codons'].append((shstart, shend)) 79 80 # move sentinels accordingly 81 qpos += qstep * qmove 82 hpos += hstep * hmove 83 84 # append to ends if the next comp is not an MCGS block or 85 # if it's the last comp 86 if idx == len(vcomps)-1 or \ 87 (label in 'MCGS' and vcomps[idx+1][0] not in 'MCGS'): 88 qends.append(qpos) 89 hends.append(hpos) 90 91 # adjust coordinates 92 for seq_type in ('query_', 'hit_'): 93 strand = hsp[seq_type + 'strand'] 94 # switch coordinates if strand is < 0 95 if strand < 0: 96 # switch the starts and ends 97 hsp[seq_type + 'start'], hsp[seq_type + 'end'] = \ 98 hsp[seq_type + 'end'], hsp[seq_type + 'start'] 99 if seq_type == 'query_': 100 qstarts, qends = qends, qstarts 101 else: 102 hstarts, hends = hends, hstarts 103 104 # set start and end ranges 105 hsp['query_ranges'] = zip(qstarts, qends) 106 hsp['hit_ranges'] = zip(hstarts, hends) 107 return hsp
108 109
110 -class ExonerateVulgarParser(_BaseExonerateParser):
111 112 """Parser for Exonerate vulgar strings.""" 113 114 _ALN_MARK = 'vulgar' 115
116 - def parse_alignment_block(self, header):
117 qresult = header['qresult'] 118 hit = header['hit'] 119 hsp = header['hsp'] 120 self.read_until(lambda line: line.startswith('vulgar')) 121 vulgars = re.search(_RE_VULGAR, self.line) 122 # if the file has c4 alignments 123 # check if vulgar values match our previously parsed header values 124 if self.has_c4_alignment: 125 assert qresult['id'] == vulgars.group(1) 126 assert hsp['query_start'] == vulgars.group(2) 127 assert hsp['query_end'] == vulgars.group(3) 128 assert hsp['query_strand'] == vulgars.group(4) 129 assert hit['id'] == vulgars.group(5) 130 assert hsp['hit_start'] == vulgars.group(6) 131 assert hsp['hit_end'] == vulgars.group(7) 132 assert hsp['hit_strand'] == vulgars.group(8) 133 assert hsp['score'] == vulgars.group(9) 134 else: 135 qresult['id'] = vulgars.group(1) 136 hsp['query_start'] = vulgars.group(2) 137 hsp['query_end'] = vulgars.group(3) 138 hsp['query_strand'] = vulgars.group(4) 139 hit['id'] = vulgars.group(5) 140 hsp['hit_start'] = vulgars.group(6) 141 hsp['hit_end'] = vulgars.group(7) 142 hsp['hit_strand'] = vulgars.group(8) 143 hsp['score'] = vulgars.group(9) 144 145 # adjust strands 146 hsp['hit_strand'] = _STRAND_MAP[hsp['hit_strand']] 147 hsp['query_strand'] = _STRAND_MAP[hsp['query_strand']] 148 # cast coords into ints 149 hsp['query_start'] = int(hsp['query_start']) 150 hsp['query_end'] = int(hsp['query_end']) 151 hsp['hit_start'] = int(hsp['hit_start']) 152 hsp['hit_end'] = int(hsp['hit_end']) 153 # cast score into int 154 hsp['score'] = int(hsp['score']) 155 # store vulgar line and parse it 156 hsp['vulgar_comp'] = vulgars.group(10) 157 hsp = parse_vulgar_comp(hsp, hsp['vulgar_comp']) 158 159 return {'qresult': qresult, 'hit': hit, 'hsp': hsp}
160 161
162 -class ExonerateVulgarIndexer(_BaseExonerateIndexer):
163 164 """Indexer class for exonerate vulgar lines.""" 165 166 _parser = ExonerateVulgarParser 167 _query_mark = _as_bytes('vulgar') 168
169 - def get_qresult_id(self, pos):
170 """Returns the query ID of the nearest vulgar line.""" 171 handle = self._handle 172 handle.seek(pos) 173 # get line, check if it's a vulgar line, and get query ID 174 line = handle.readline() 175 assert line.startswith(self._query_mark), line 176 id = re.search(_RE_VULGAR, _bytes_to_string(line)) 177 return id.group(1)
178
179 - def get_raw(self, offset):
180 """Returns the raw string of a QueryResult object from the given offset.""" 181 handle = self._handle 182 handle.seek(offset) 183 qresult_key = None 184 qresult_raw = _as_bytes('') 185 186 while True: 187 line = handle.readline() 188 if not line: 189 break 190 elif line.startswith(self._query_mark): 191 cur_pos = handle.tell() - len(line) 192 if qresult_key is None: 193 qresult_key = self.get_qresult_id(cur_pos) 194 else: 195 curr_key = self.get_qresult_id(cur_pos) 196 if curr_key != qresult_key: 197 break 198 qresult_raw += line 199 200 return qresult_raw
201 202 203 # if not used as a module, run the doctest 204 if __name__ == "__main__": 205 from Bio._utils import run_doctest 206 run_doctest() 207