Package Bio :: Package SearchIO :: Package ExonerateIO :: Module _base
[hide private]
[frames] | no frames]

Source Code for Module Bio.SearchIO.ExonerateIO._base

  1  # Copyright 2012 by Wibowo Arindrarto.  All rights reserved. 
  2  # This code is part of the Biopython distribution and governed by its 
  3  # license.  Please see the LICENSE file that should have been included 
  4  # as part of this package. 
  5   
  6  """Bio.SearchIO abstract base parser for Exonerate standard output format.""" 
  7   
  8  import re 
  9  from functools import reduce 
 10   
 11  from Bio.SearchIO._index import SearchIndexer 
 12  from Bio.SearchIO._model import QueryResult, Hit, HSP, HSPFragment 
 13  from Bio.SeqUtils import seq1 
 14   
 15   
 16  # strand char-value mapping 
 17  _STRAND_MAP = {'+': 1, '-': -1, '.': 0} 
 18   
 19  _RE_SHIFTS = re.compile(r'(#+)') 
 20  # regex for checking whether a vulgar line has protein/translated components 
 21  _RE_TRANS = re.compile(r'[53ISCF]') 
 22   
 23   
24 -def _set_frame(frag):
25 """Set the HSPFragment frames (PRIVATE).""" 26 frag.hit_frame = (frag.hit_start % 3 + 1) * frag.hit_strand 27 frag.query_frame = (frag.query_start % 3 + 1) * frag.query_strand
28 29
30 -def _make_triplets(seq, phase=0):
31 """Select a valid amino acid sequence given a 3-letter code input (PRIVATE). 32 33 This function takes a single three-letter amino acid sequence and the phase 34 of the sequence to return the longest intact amino acid sequence possible. 35 Parts of the input sequence before and after the selected sequence are also 36 returned. 37 38 This is an internal private function and is meant for parsing Exonerate's 39 three-letter amino acid output. 40 41 >>> from Bio.SearchIO.ExonerateIO._base import _make_triplets 42 >>> _make_triplets('GlyThrSerAlaPro') 43 ('', ['Gly', 'Thr', 'Ser', 'Ala', 'Pro'], '') 44 >>> _make_triplets('yThSerAla', phase=1) 45 ('y', ['Thr', 'Ser', 'Ala'], '') 46 >>> _make_triplets('yThSerAlaPr', phase=1) 47 ('y', ['Thr', 'Ser', 'Ala'], 'Pr') 48 49 """ 50 pre = seq[:phase] 51 np_seq = seq[phase:] 52 non_triplets = len(np_seq) % 3 53 post = "" if not non_triplets else np_seq[-1 * non_triplets:] 54 intacts = [np_seq[3 * i:3 * (i + 1)] 55 for i in range(len(np_seq) // 3)] 56 return pre, intacts, post
57 58
59 -def _get_fragments_coord(frags):
60 """Return the letter coordinate of the given list of fragments (PRIVATE). 61 62 This function takes a list of three-letter amino acid sequences and 63 returns a list of coordinates for each fragment had all the input 64 sequences been flattened. 65 66 This is an internal private function and is meant for parsing Exonerate's 67 three-letter amino acid output. 68 69 >>> from Bio.SearchIO.ExonerateIO._base import _get_fragments_coord 70 >>> _get_fragments_coord(['Thr', 'Ser', 'Ala']) 71 [0, 3, 6] 72 >>> _get_fragments_coord(['Thr', 'SerAlaPro', 'GlyLeu']) 73 [0, 3, 12, ] 74 >>> _get_fragments_coord(['Thr', 'SerAlaPro', 'GlyLeu', 'Cys']) 75 [0, 3, 12, 18] 76 77 """ 78 if not frags: 79 return [] 80 # first fragment always starts from position 0 81 init = [0] 82 return reduce(lambda acc, frag: acc + [acc[-1] + len(frag)], 83 frags[:-1], init)
84 85
86 -def _get_fragments_phase(frags):
87 """Return the phases of the given list of 3-letter amino acid fragments (PRIVATE). 88 89 This is an internal private function and is meant for parsing Exonerate's 90 three-letter amino acid output. 91 92 >>> from Bio.SearchIO.ExonerateIO._base import _get_fragments_phase 93 >>> _get_fragments_phase(['Thr', 'Ser', 'Ala']) 94 [0, 0, 0] 95 >>> _get_fragments_phase(['ThrSe', 'rAla']) 96 [0, 1] 97 >>> _get_fragments_phase(['ThrSe', 'rAlaLeu', 'ProCys']) 98 [0, 1, 0] 99 >>> _get_fragments_phase(['ThrSe', 'rAlaLeuP', 'roCys']) 100 [0, 1, 2] 101 >>> _get_fragments_phase(['ThrSe', 'rAlaLeuPr', 'oCys']) 102 [0, 1, 1] 103 104 """ 105 return [(3 - (x % 3)) % 3 for x in _get_fragments_coord(frags)]
106 107
108 -def _adjust_aa_seq(fraglist):
109 """Transform 3-letter AA codes of input fragments to one-letter codes (PRIVATE). 110 111 Argument fraglist should be a list of HSPFragments objects. 112 """ 113 custom_map = {'***': '*', '<->': '-'} 114 hsp_hstart = fraglist[0].hit_start 115 hsp_qstart = fraglist[0].query_start 116 frag_phases = _get_fragments_phase(fraglist) 117 for frag, phase in zip(fraglist, frag_phases): 118 assert frag.query_strand == 0 or frag.hit_strand == 0 119 # hit step may be -1 as we're aligning to DNA 120 hstep = 1 if frag.hit_strand >= 0 else -1 121 122 # set fragment phase 123 frag.phase = phase 124 125 # fragment should have a length that is a multiple of 3 126 # assert len(frag) % 3 == 0 127 qseq = str(frag.query.seq) 128 q_triplets_pre, q_triplets, q_triplets_post = \ 129 _make_triplets(qseq, phase) 130 131 hseq = str(frag.hit.seq) 132 h_triplets_pre, h_triplets, h_triplets_post = \ 133 _make_triplets(hseq, phase) 134 135 # get one letter codes 136 # and replace gap codon markers and termination characters 137 hseq1_pre = "X" if h_triplets_pre else "" 138 hseq1_post = "X" if h_triplets_post else "" 139 hseq1 = seq1("".join(h_triplets), custom_map=custom_map) 140 hstart = hsp_hstart + (len(hseq1_pre) * hstep) 141 hend = hstart + len(hseq1.replace('-', '')) * hstep 142 143 qseq1_pre = "X" if q_triplets_pre else "" 144 qseq1_post = "X" if q_triplets_post else "" 145 qseq1 = seq1("".join(q_triplets), custom_map=custom_map) 146 qstart = hsp_qstart + len(qseq1_pre) 147 qend = qstart + len(qseq1.replace('-', '')) 148 149 # replace the old frag sequences with the new ones 150 frag.hit = None 151 frag.query = None 152 frag.hit = hseq1_pre + hseq1 + hseq1_post 153 frag.query = qseq1_pre + qseq1 + qseq1_post 154 155 # set coordinates for the protein sequence 156 if frag.query_strand == 0: 157 frag.query_start, frag.query_end = qstart, qend 158 elif frag.hit_strand == 0: 159 frag.hit_start, frag.hit_end = hstart, hend 160 161 # update alignment annotation 162 # by turning them into list of triplets 163 for annot, annotseq in frag.aln_annotation.items(): 164 pre, intact, post = _make_triplets(annotseq, phase) 165 frag.aln_annotation[annot] = \ 166 list(filter(None, [pre])) + intact + list(filter(None, [post])) 167 168 # update values for next iteration 169 hsp_hstart, hsp_qstart = hend, qend 170 171 return fraglist
172 173
174 -def _split_fragment(frag):
175 """Split one HSPFragment containing frame-shifted alignment into two (PRIVATE).""" 176 # given an HSPFragment object with frameshift(s), this method splits it 177 # into fragments without frameshifts by sequentially chopping it off 178 # starting from the beginning 179 simil = frag.aln_annotation['similarity'] 180 # we should have at least 1 frame shift for splitting 181 assert simil.count('#') > 0 182 183 split_frags = [] 184 qstep = 1 if frag.query_strand >= 0 else -1 185 hstep = 1 if frag.hit_strand >= 0 else -1 186 qpos = min(frag.query_range) if qstep >= 0 else max(frag.query_range) 187 hpos = min(frag.hit_range) if qstep >= 0 else max(frag.hit_range) 188 abs_pos = 0 189 # split according to hit, then query 190 while simil: 191 192 try: 193 shifts = re.search(_RE_SHIFTS, simil).group(1) 194 s_start = simil.find(shifts) 195 s_stop = s_start + len(shifts) 196 split = frag[abs_pos:abs_pos + s_start] 197 except AttributeError: # no '#' in simil, i.e. last frag 198 shifts = '' 199 s_start = 0 200 s_stop = len(simil) 201 split = frag[abs_pos:] 202 203 # coordinates for the split strand 204 qstart, hstart = qpos, hpos 205 qpos += (len(split) - sum(str(split.query.seq).count(x) 206 for x in ('-', '<', '>'))) * qstep 207 hpos += (len(split) - sum(str(split.hit.seq).count(x) 208 for x in ('-', '<', '>'))) * hstep 209 210 split.hit_start = min(hstart, hpos) 211 split.query_start = min(qstart, qpos) 212 split.hit_end = max(hstart, hpos) 213 split.query_end = max(qstart, qpos) 214 215 # account for frameshift length 216 abs_slice = slice(abs_pos + s_start, abs_pos + s_stop) 217 if len(frag.aln_annotation) == 2: 218 seqs = (str(frag[abs_slice].query.seq), 219 str(frag[abs_slice].hit.seq)) 220 elif len(frag.aln_annotation) == 3: 221 seqs = (frag[abs_slice].aln_annotation['query_annotation'], 222 frag[abs_slice].aln_annotation['hit_annotation'],) 223 if '#' in seqs[0]: 224 qpos += len(shifts) * qstep 225 elif '#' in seqs[1]: 226 hpos += len(shifts) * hstep 227 228 # set frame 229 _set_frame(split) 230 split_frags.append(split) 231 # set similarity string and absolute position for the next loop 232 simil = simil[s_stop:] 233 abs_pos += s_stop 234 235 return split_frags
236 237
238 -def _create_hsp(hid, qid, hspd):
239 """Return a list of HSP objects from the given parsed HSP values (PRIVATE).""" 240 frags = [] 241 # we are iterating over query_ranges, but hit_ranges works just as well 242 for idx, qcoords in enumerate(hspd['query_ranges']): 243 # get sequences, create object 244 hseqlist = hspd.get('hit') 245 hseq = '' if hseqlist is None else hseqlist[idx] 246 qseqlist = hspd.get('query') 247 qseq = '' if qseqlist is None else qseqlist[idx] 248 frag = HSPFragment(hid, qid, hit=hseq, query=qseq) 249 # coordinates 250 frag.query_start = qcoords[0] 251 frag.query_end = qcoords[1] 252 frag.hit_start = hspd['hit_ranges'][idx][0] 253 frag.hit_end = hspd['hit_ranges'][idx][1] 254 # alignment annotation 255 try: 256 aln_annot = hspd.get('aln_annotation', {}) 257 for key, value in aln_annot.items(): 258 frag.aln_annotation[key] = value[idx] 259 except IndexError: 260 pass 261 # strands 262 frag.query_strand = hspd['query_strand'] 263 frag.hit_strand = hspd['hit_strand'] 264 # and append the hsp object to the list 265 if frag.aln_annotation.get('similarity') is not None: 266 if '#' in frag.aln_annotation['similarity']: 267 frags.extend(_split_fragment(frag)) 268 continue 269 # try to set frame if there are translation in the alignment 270 if len(frag.aln_annotation) > 1 or \ 271 frag.query_strand == 0 or \ 272 ('vulgar_comp' in hspd and re.search(_RE_TRANS, hspd['vulgar_comp'])): 273 _set_frame(frag) 274 275 frags.append(frag) 276 277 # if the query is protein, we need to change the hit and query sequences 278 # from three-letter amino acid codes to one letter, and adjust their 279 # coordinates accordingly 280 if len(frags[0].aln_annotation) == 2: # 2 annotations == protein query 281 frags = _adjust_aa_seq(frags) 282 283 hsp = HSP(frags) 284 # set hsp-specific attributes 285 for attr in ('score', 'hit_split_codons', 'query_split_codons', 286 'model', 'vulgar_comp', 'cigar_comp', 'alphabet'): 287 if attr in hspd: 288 setattr(hsp, attr, hspd[attr]) 289 290 return hsp
291 292
293 -def _parse_hit_or_query_line(line):
294 """Parse the 'Query:' line of exonerate alignment outputs (PRIVATE).""" 295 try: 296 mark, id, desc = line.split(' ', 2) 297 except ValueError: # no desc 298 mark, id = line.split(' ', 1) 299 desc = '' 300 301 return id, desc
302 303
304 -class _BaseExonerateParser(object):
305 """Abstract iterator for exonerate format.""" 306 307 _ALN_MARK = None 308
309 - def __init__(self, handle):
310 self.handle = handle 311 self.has_c4_alignment = False
312
313 - def __iter__(self):
314 # read line until the first alignment block or cigar/vulgar lines 315 while True: 316 self.line = self.handle.readline() 317 # flag for human-readable alignment block 318 if self.line.startswith('C4 Alignment:') and not \ 319 self.has_c4_alignment: 320 self.has_c4_alignment = True 321 if self.line.startswith('C4 Alignment:') or \ 322 self.line.startswith('vulgar:') or \ 323 self.line.startswith('cigar:'): 324 break 325 elif not self.line or self.line.startswith('-- completed '): 326 return 327 328 for qresult in self._parse_qresult(): 329 qresult.program = 'exonerate' 330 # HACK: so that all descriptions are set 331 qresult.description = qresult.description 332 for hit in qresult: 333 hit.description = hit.description 334 yield qresult
335
336 - def read_until(self, bool_func):
337 """Read the file handle until the given bool function returns True.""" 338 while True: 339 if not self.line or bool_func(self.line): 340 return 341 else: 342 self.line = self.handle.readline()
343
344 - def parse_alignment_block(self, header):
345 raise NotImplementedError("Subclass must implement this")
346
347 - def _parse_alignment_header(self):
348 # read all header lines and store them 349 aln_header = [] 350 # header is everything before the first empty line 351 while self.line.strip(): 352 aln_header.append(self.line.strip()) 353 self.line = self.handle.readline() 354 # then parse them 355 qresult, hit, hsp = {}, {}, {} 356 for line in aln_header: 357 # query line 358 if line.startswith('Query:'): 359 qresult['id'], qresult['description'] = \ 360 _parse_hit_or_query_line(line) 361 # target line 362 elif line.startswith('Target:'): 363 hit['id'], hit['description'] = \ 364 _parse_hit_or_query_line(line) 365 # model line 366 elif line.startswith('Model:'): 367 qresult['model'] = line.split(' ', 1)[1] 368 # score line 369 elif line.startswith('Raw score:'): 370 hsp['score'] = line.split(' ', 2)[2] 371 # query range line 372 elif line.startswith('Query range:'): 373 # line is always 'Query range: \d+ -> \d+', so we can pluck 374 # the numbers directly 375 hsp['query_start'], hsp['query_end'] = line.split(' ', 4)[2:5:2] 376 # hit range line 377 elif line.startswith('Target range:'): 378 # same logic with query range 379 hsp['hit_start'], hsp['hit_end'] = line.split(' ', 4)[2:5:2] 380 381 # determine strand 382 if qresult['description'].endswith(':[revcomp]'): 383 hsp['query_strand'] = '-' 384 qresult['description'] = qresult['description'].replace(':[revcomp]', '') 385 elif 'protein2' in qresult['model']: 386 hsp['query_strand'] = '.' 387 else: 388 hsp['query_strand'] = '+' 389 if hit['description'].endswith(':[revcomp]'): 390 hsp['hit_strand'] = '-' 391 hit['description'] = hit['description'].replace(':[revcomp]', '') 392 elif '2protein' in qresult['model']: 393 hsp['hit_strand'] = '.' 394 else: 395 hsp['hit_strand'] = '+' 396 397 # NOTE: we haven't processed the coordinates types 398 # and the strands are not yet Biopython's standard (1 / -1 / 0) 399 # since it's easier if we do the conversion later 400 401 return {'qresult': qresult, 'hit': hit, 'hsp': hsp}
402
403 - def _parse_qresult(self):
404 # state values 405 state_EOF = 0 406 state_QRES_NEW = 1 407 state_QRES_SAME = 3 408 state_HIT_NEW = 2 409 state_HIT_SAME = 4 410 # initial dummies 411 qres_state, hit_state = None, None 412 file_state = None 413 cur_qid, cur_hid = None, None 414 prev_qid, prev_hid = None, None 415 cur, prev = None, None 416 hit_list, hsp_list = [], [] 417 # if the file has c4 alignments, use that as the alignment mark 418 if self.has_c4_alignment: 419 self._ALN_MARK = 'C4 Alignment:' 420 421 while True: 422 self.read_until(lambda line: line.startswith(self._ALN_MARK)) 423 if cur is not None: 424 prev = cur 425 prev_qid = cur_qid 426 prev_hid = cur_hid 427 # only parse the result row if it's not EOF 428 if self.line: 429 assert self.line.startswith(self._ALN_MARK), self.line 430 # create temp dicts for storing parsed values 431 header = {'qresult': {}, 'hit': {}, 'hsp': {}} 432 # if the file has c4 alignments, try to parse the header 433 if self.has_c4_alignment: 434 self.read_until(lambda line: 435 line.strip().startswith('Query:')) 436 header = self._parse_alignment_header() 437 # parse the block contents 438 cur = self.parse_alignment_block(header) 439 cur_qid = cur['qresult']['id'] 440 cur_hid = cur['hit']['id'] 441 elif not self.line or self.line.startswith('-- completed '): 442 file_state = state_EOF 443 cur_qid, cur_hid = None, None 444 445 # get the state of hit and qresult 446 if prev_qid != cur_qid: 447 qres_state = state_QRES_NEW 448 else: 449 qres_state = state_QRES_SAME 450 # new hits are hits with different ids or hits in a new query 451 if prev_hid != cur_hid or qres_state == state_QRES_NEW: 452 hit_state = state_HIT_NEW 453 else: 454 hit_state = state_HIT_SAME 455 456 if prev is not None: 457 hsp = _create_hsp(prev_hid, prev_qid, prev['hsp']) 458 hsp_list.append(hsp) 459 460 if hit_state == state_HIT_NEW: 461 hit = Hit(hsp_list) 462 for attr, value in prev['hit'].items(): 463 setattr(hit, attr, value) 464 hit_list.append(hit) 465 hsp_list = [] 466 467 if qres_state == state_QRES_NEW or file_state == state_EOF: 468 qresult = QueryResult(id=prev_qid) 469 for hit in hit_list: 470 # not using append since Exonerate may separate the 471 # same hit if it has different strands 472 qresult.absorb(hit) 473 for attr, value in prev['qresult'].items(): 474 setattr(qresult, attr, value) 475 yield qresult 476 if file_state == state_EOF: 477 break 478 hit_list = [] 479 480 # only readline() here if we're not parsing C4 alignments 481 # C4 alignments readline() is handled by its parse_alignment_block 482 # function 483 if not self.has_c4_alignment: 484 self.line = self.handle.readline()
485 486
487 -class _BaseExonerateIndexer(SearchIndexer):
488 """Indexer class for Exonerate plain text.""" 489 490 _parser = None # should be defined by subclass 491 _query_mark = None # this one too 492
493 - def get_qresult_id(self, pos):
494 raise NotImplementedError("Should be defined by subclass")
495
496 - def __iter__(self):
497 """Iterate over the file handle; yields key, start offset, and length.""" 498 handle = self._handle 499 handle.seek(0) 500 qresult_key = None 501 502 while True: 503 start_offset = handle.tell() 504 line = handle.readline() 505 if line.startswith(self._query_mark): 506 if qresult_key is None: 507 qresult_key = self.get_qresult_id(start_offset) 508 qresult_offset = start_offset 509 else: 510 curr_key = self.get_qresult_id(start_offset) 511 if curr_key != qresult_key: 512 yield qresult_key, qresult_offset, \ 513 start_offset - qresult_offset 514 qresult_key = curr_key 515 qresult_offset = start_offset 516 handle.seek(qresult_offset) 517 elif not line: 518 yield qresult_key, qresult_offset, \ 519 start_offset - qresult_offset 520 break
521 522 523 # if not used as a module, run the doctest 524 if __name__ == "__main__": 525 from Bio._utils import run_doctest 526 run_doctest() 527