Package Bio :: Package SearchIO :: Package ExonerateIO :: Module exonerate_vulgar
[hide private]
[frames] | no frames]

Source Code for Module Bio.SearchIO.ExonerateIO.exonerate_vulgar

  1  # Copyright 2012 by Wibowo Arindrarto.  All rights reserved. 
  2  # This code is part of the Biopython distribution and governed by its 
  3  # license.  Please see the LICENSE file that should have been included 
  4  # as part of this package. 
  5   
  6  """Bio.SearchIO parser for Exonerate vulgar output format.""" 
  7   
  8  import re 
  9   
 10  from Bio._py3k import _as_bytes, _bytes_to_string 
 11  from Bio._py3k import zip 
 12   
 13  from ._base import _BaseExonerateParser, _BaseExonerateIndexer, _STRAND_MAP 
 14   
 15   
 16  __all__ = ('ExonerateVulgarParser', 'ExonerateVulgarIndexer') 
 17   
 18   
 19  # precompile regex 
 20  _RE_VULGAR = re.compile(r"""^vulgar:\s+ 
 21          (\S+)\s+(\d+)\s+(\d+)\s+([\+-\.])\s+  # query: ID, start, end, strand 
 22          (\S+)\s+(\d+)\s+(\d+)\s+([\+-\.])\s+  # hit: ID, start, end, strand 
 23          (\d+)(\s+.*)$                         # score, vulgar components 
 24          """, re.VERBOSE) 
 25   
 26  _RE_VCOMP = re.compile(r""" 
 27          \s+(\S+) # vulgar label (C/M: codon/match, G: gap, N: ner, 5/3: splice 
 28                   #               site, I: intron, S: split codon, F: frameshift) 
 29          \s+(\d+) # how many residues to advance in query sequence 
 30          \s+(\d+) # how many residues to advance in hit sequence 
 31          """, re.VERBOSE) 
 32   
 33   
34 -def parse_vulgar_comp(hsp, vulgar_comp):
35 """Parses the vulgar components present in the hsp dictionary.""" 36 # containers for block coordinates 37 qstarts, qends, hstarts, hends = \ 38 [hsp['query_start']], [], [hsp['hit_start']], [] 39 # containers for split codons 40 hsp['query_split_codons'], hsp['hit_split_codons'] = [], [] 41 # containers for ner blocks 42 hsp['query_ner_ranges'], hsp['hit_ner_ranges'] = [], [] 43 # sentinels for tracking query and hit positions 44 qpos, hpos = hsp['query_start'], hsp['hit_start'] 45 # multiplier for determining sentinel movement 46 qmove = 1 if hsp['query_strand'] >= 0 else -1 47 hmove = 1 if hsp['hit_strand'] >= 0 else -1 48 49 vcomps = re.findall(_RE_VCOMP, vulgar_comp) 50 for idx, match in enumerate(vcomps): 51 label, qstep, hstep = match[0], int(match[1]), int(match[2]) 52 # check for label, must be recognized 53 assert label in 'MCGF53INS', "Unexpected vulgar label: %r" % label 54 # match, codon, or gaps 55 if label in 'MCGS': 56 # if the previous comp is not an MCGS block, it's the 57 # start of a new block 58 if vcomps[idx - 1][0] not in 'MCGS': 59 qstarts.append(qpos) 60 hstarts.append(hpos) 61 # other labels 62 # store the values in the hsp dict as a tuple of (start, stop) 63 # we're not doing anything if the label is in '53IN', as these 64 # basically tell us what the inter-block coordinates are and 65 # inter-block coordinates are automatically calculated by 66 # and HSP property 67 if label == 'S': 68 # get start and stop from parsed values 69 qstart, hstart = qpos, hpos 70 qend = qstart + qstep * qmove 71 hend = hstart + hstep * hmove 72 # adjust the start-stop ranges 73 sqstart, sqend = min(qstart, qend), max(qstart, qend) 74 shstart, shend = min(hstart, hend), max(hstart, hend) 75 # split codons 76 # XXX: is it possible to have a frameshift that introduces 77 # a codon split? If so, this may need a different treatment.. 78 hsp['query_split_codons'].append((sqstart, sqend)) 79 hsp['hit_split_codons'].append((shstart, shend)) 80 81 # move sentinels accordingly 82 qpos += qstep * qmove 83 hpos += hstep * hmove 84 85 # append to ends if the next comp is not an MCGS block or 86 # if it's the last comp 87 if idx == len(vcomps) - 1 or \ 88 (label in 'MCGS' and vcomps[idx + 1][0] not in 'MCGS'): 89 qends.append(qpos) 90 hends.append(hpos) 91 92 # adjust coordinates 93 for seq_type in ('query_', 'hit_'): 94 strand = hsp[seq_type + 'strand'] 95 # switch coordinates if strand is < 0 96 if strand < 0: 97 # switch the starts and ends 98 hsp[seq_type + 'start'], hsp[seq_type + 'end'] = \ 99 hsp[seq_type + 'end'], hsp[seq_type + 'start'] 100 if seq_type == 'query_': 101 qstarts, qends = qends, qstarts 102 else: 103 hstarts, hends = hends, hstarts 104 105 # set start and end ranges 106 hsp['query_ranges'] = list(zip(qstarts, qends)) 107 hsp['hit_ranges'] = list(zip(hstarts, hends)) 108 return hsp
109 110
111 -class ExonerateVulgarParser(_BaseExonerateParser):
112 """Parser for Exonerate vulgar strings.""" 113 114 _ALN_MARK = 'vulgar' 115
116 - def parse_alignment_block(self, header):
117 qresult = header['qresult'] 118 hit = header['hit'] 119 hsp = header['hsp'] 120 self.read_until(lambda line: line.startswith('vulgar')) 121 vulgars = re.search(_RE_VULGAR, self.line) 122 # if the file has c4 alignments 123 # check if vulgar values match our previously parsed header values 124 if self.has_c4_alignment: 125 assert qresult['id'] == vulgars.group(1) 126 assert hsp['query_start'] == vulgars.group(2) 127 assert hsp['query_end'] == vulgars.group(3) 128 assert hsp['query_strand'] == vulgars.group(4) 129 assert hit['id'] == vulgars.group(5) 130 assert hsp['hit_start'] == vulgars.group(6) 131 assert hsp['hit_end'] == vulgars.group(7) 132 assert hsp['hit_strand'] == vulgars.group(8) 133 assert hsp['score'] == vulgars.group(9) 134 else: 135 qresult['id'] = vulgars.group(1) 136 hsp['query_start'] = vulgars.group(2) 137 hsp['query_end'] = vulgars.group(3) 138 hsp['query_strand'] = vulgars.group(4) 139 hit['id'] = vulgars.group(5) 140 hsp['hit_start'] = vulgars.group(6) 141 hsp['hit_end'] = vulgars.group(7) 142 hsp['hit_strand'] = vulgars.group(8) 143 hsp['score'] = vulgars.group(9) 144 145 # adjust strands 146 hsp['hit_strand'] = _STRAND_MAP[hsp['hit_strand']] 147 hsp['query_strand'] = _STRAND_MAP[hsp['query_strand']] 148 # cast coords into ints 149 hsp['query_start'] = int(hsp['query_start']) 150 hsp['query_end'] = int(hsp['query_end']) 151 hsp['hit_start'] = int(hsp['hit_start']) 152 hsp['hit_end'] = int(hsp['hit_end']) 153 # cast score into int 154 hsp['score'] = int(hsp['score']) 155 # store vulgar line and parse it 156 # rstrip to remove line endings (otherwise gives errors in Windows) 157 hsp['vulgar_comp'] = vulgars.group(10).rstrip() 158 hsp = parse_vulgar_comp(hsp, hsp['vulgar_comp']) 159 160 return {'qresult': qresult, 'hit': hit, 'hsp': hsp}
161 162
163 -class ExonerateVulgarIndexer(_BaseExonerateIndexer):
164 """Indexer class for exonerate vulgar lines.""" 165 166 _parser = ExonerateVulgarParser 167 _query_mark = _as_bytes('vulgar') 168
169 - def get_qresult_id(self, pos):
170 """Returns the query ID of the nearest vulgar line.""" 171 handle = self._handle 172 handle.seek(pos) 173 # get line, check if it's a vulgar line, and get query ID 174 line = handle.readline() 175 assert line.startswith(self._query_mark), line 176 id = re.search(_RE_VULGAR, _bytes_to_string(line)) 177 return id.group(1)
178
179 - def get_raw(self, offset):
180 """Returns the raw bytes string of a QueryResult object from the given offset.""" 181 handle = self._handle 182 handle.seek(offset) 183 qresult_key = None 184 qresult_raw = _as_bytes('') 185 186 while True: 187 line = handle.readline() 188 if not line: 189 break 190 elif line.startswith(self._query_mark): 191 cur_pos = handle.tell() - len(line) 192 if qresult_key is None: 193 qresult_key = self.get_qresult_id(cur_pos) 194 else: 195 curr_key = self.get_qresult_id(cur_pos) 196 if curr_key != qresult_key: 197 break 198 qresult_raw += line 199 200 return qresult_raw
201 202 203 # if not used as a module, run the doctest 204 if __name__ == "__main__": 205 from Bio._utils import run_doctest 206 run_doctest() 207