Package Bio :: Package SearchIO :: Package ExonerateIO :: Module _base
[hide private]
[frames] | no frames]

Source Code for Module Bio.SearchIO.ExonerateIO._base

  1  # Copyright 2012 by Wibowo Arindrarto.  All rights reserved. 
  2  # This code is part of the Biopython distribution and governed by its 
  3  # license.  Please see the LICENSE file that should have been included 
  4  # as part of this package. 
  5   
  6  """Bio.SearchIO abstract base parser for Exonerate standard output format.""" 
  7   
  8  import re 
  9  from functools import reduce 
 10   
 11  from Bio.SearchIO._index import SearchIndexer 
 12  from Bio.SearchIO._model import QueryResult, Hit, HSP, HSPFragment 
 13  from Bio.SeqUtils import seq1 
 14   
 15   
 16  # strand char-value mapping 
 17  _STRAND_MAP = {'+': 1, '-': -1, '.': 0} 
 18   
 19  _RE_SHIFTS = re.compile(r'(#+)') 
 20  # regex for checking whether a vulgar line has protein/translated components 
 21  _RE_TRANS = re.compile(r'[53ISCF]') 
 22   
 23   
24 -def _set_frame(frag):
25 """Sets the HSPFragment frames.""" 26 frag.hit_frame = (frag.hit_start % 3 + 1) * frag.hit_strand 27 frag.query_frame = (frag.query_start % 3 + 1) * frag.query_strand
28 29
30 -def _make_triplets(seq, phase=0):
31 """Selects a valid amino acid sequence given a 3-letter code input. 32 33 This function takes a single three-letter amino acid sequence and the phase 34 of the sequence to return the longest intact amino acid sequence possible. 35 Parts of the input sequence before and after the selected sequence are also 36 returned. 37 38 This is an internal private function and is meant for parsing Exonerate's 39 three-letter amino acid output. 40 41 >>> from Bio.SearchIO.ExonerateIO._base import _make_triplets 42 >>> _make_triplets('GlyThrSerAlaPro') 43 ('', ['Gly', 'Thr', 'Ser', 'Ala', 'Pro'], '') 44 >>> _make_triplets('yThSerAla', phase=1) 45 ('y', ['Thr', 'Ser', 'Ala'], '') 46 >>> _make_triplets('yThSerAlaPr', phase=1) 47 ('y', ['Thr', 'Ser', 'Ala'], 'Pr') 48 49 """ 50 pre = seq[:phase] 51 np_seq = seq[phase:] 52 non_triplets = len(np_seq) % 3 53 post = "" if not non_triplets else np_seq[-1 * non_triplets:] 54 intacts = [np_seq[3 * i:3 * (i + 1)] 55 for i in range(len(np_seq) // 3)] 56 return pre, intacts, post
57 58
59 -def _get_fragments_coord(frags):
60 """Returns the letter coordinate of the given list of fragments. 61 62 This function takes a list of three-letter amino acid sequences and 63 returns a list of coordinates for each fragment had all the input 64 sequences been flattened. 65 66 This is an internal private function and is meant for parsing Exonerate's 67 three-letter amino acid output. 68 69 >>> from Bio.SearchIO.ExonerateIO._base import _get_fragments_coord 70 >>> _get_fragments_coord(['Thr', 'Ser', 'Ala']) 71 [0, 3, 6] 72 >>> _get_fragments_coord(['Thr', 'SerAlaPro', 'GlyLeu']) 73 [0, 3, 12, ] 74 >>> _get_fragments_coord(['Thr', 'SerAlaPro', 'GlyLeu', 'Cys']) 75 [0, 3, 12, 18] 76 77 """ 78 if not frags: 79 return [] 80 # first fragment always starts from position 0 81 init = [0] 82 return reduce(lambda acc, frag: acc + [acc[-1] + len(frag)], 83 frags[:-1], init)
84 85
86 -def _get_fragments_phase(frags):
87 """Returns the phases of the given list of 3-letter amino acid fragments. 88 89 This is an internal private function and is meant for parsing Exonerate's 90 three-letter amino acid output. 91 92 >>> from Bio.SearchIO.ExonerateIO._base import _get_fragments_phase 93 >>> _get_fragments_phase(['Thr', 'Ser', 'Ala']) 94 [0, 0, 0] 95 >>> _get_fragments_phase(['ThrSe', 'rAla']) 96 [0, 1] 97 >>> _get_fragments_phase(['ThrSe', 'rAlaLeu', 'ProCys']) 98 [0, 1, 0] 99 >>> _get_fragments_phase(['ThrSe', 'rAlaLeuP', 'roCys']) 100 [0, 1, 2] 101 >>> _get_fragments_phase(['ThrSe', 'rAlaLeuPr', 'oCys']) 102 [0, 1, 1] 103 104 """ 105 return [(3 - (x % 3)) % 3 for x in _get_fragments_coord(frags)]
106 107
108 -def _adjust_aa_seq(fraglist):
109 """Transforms three-letter amino acid codes into one-letters in the 110 given HSPFragments.""" 111 custom_map = {'***': '*', '<->': '-'} 112 hsp_hstart = fraglist[0].hit_start 113 hsp_qstart = fraglist[0].query_start 114 frag_phases = _get_fragments_phase(fraglist) 115 for frag, phase in zip(fraglist, frag_phases): 116 assert frag.query_strand == 0 or frag.hit_strand == 0 117 # hit step may be -1 as we're aligning to DNA 118 hstep = 1 if frag.hit_strand >= 0 else -1 119 120 # set fragment phase 121 frag.phase = phase 122 123 # fragment should have a length that is a multiple of 3 124 # assert len(frag) % 3 == 0 125 qseq = str(frag.query.seq) 126 q_triplets_pre, q_triplets, q_triplets_post = \ 127 _make_triplets(qseq, phase) 128 129 hseq = str(frag.hit.seq) 130 h_triplets_pre, h_triplets, h_triplets_post = \ 131 _make_triplets(hseq, phase) 132 133 # get one letter codes 134 # and replace gap codon markers and termination characters 135 hseq1_pre = "X" if h_triplets_pre else "" 136 hseq1_post = "X" if h_triplets_post else "" 137 hseq1 = seq1("".join(h_triplets), custom_map=custom_map) 138 hstart = hsp_hstart + (len(hseq1_pre) * hstep) 139 hend = hstart + len(hseq1.replace('-', '')) * hstep 140 141 qseq1_pre = "X" if q_triplets_pre else "" 142 qseq1_post = "X" if q_triplets_post else "" 143 qseq1 = seq1("".join(q_triplets), custom_map=custom_map) 144 qstart = hsp_qstart + len(qseq1_pre) 145 qend = qstart + len(qseq1.replace('-', '')) 146 147 # replace the old frag sequences with the new ones 148 frag.hit = None 149 frag.query = None 150 frag.hit = hseq1_pre + hseq1 + hseq1_post 151 frag.query = qseq1_pre + qseq1 + qseq1_post 152 153 # set coordinates for the protein sequence 154 if frag.query_strand == 0: 155 frag.query_start, frag.query_end = qstart, qend 156 elif frag.hit_strand == 0: 157 frag.hit_start, frag.hit_end = hstart, hend 158 159 # update alignment annotation 160 # by turning them into list of triplets 161 for annot, annotseq in frag.aln_annotation.items(): 162 pre, intact, post = _make_triplets(annotseq, phase) 163 frag.aln_annotation[annot] = \ 164 list(filter(None, [pre])) + intact + list(filter(None, [post])) 165 166 # update values for next iteration 167 hsp_hstart, hsp_qstart = hend, qend 168 169 return fraglist
170 171
172 -def _split_fragment(frag):
173 """Splits one HSPFragment containing frame-shifted alignment into two.""" 174 # given an HSPFragment object with frameshift(s), this method splits it 175 # into fragments without frameshifts by sequentially chopping it off 176 # starting from the beginning 177 simil = frag.aln_annotation['similarity'] 178 # we should have at least 1 frame shift for splitting 179 assert simil.count('#') > 0 180 181 split_frags = [] 182 qstep = 1 if frag.query_strand >= 0 else -1 183 hstep = 1 if frag.hit_strand >= 0 else -1 184 qpos = min(frag.query_range) if qstep >= 0 else max(frag.query_range) 185 hpos = min(frag.hit_range) if qstep >= 0 else max(frag.hit_range) 186 abs_pos = 0 187 # split according to hit, then query 188 while simil: 189 190 try: 191 shifts = re.search(_RE_SHIFTS, simil).group(1) 192 s_start = simil.find(shifts) 193 s_stop = s_start + len(shifts) 194 split = frag[abs_pos:abs_pos + s_start] 195 except AttributeError: # no '#' in simil, i.e. last frag 196 shifts = '' 197 s_start = 0 198 s_stop = len(simil) 199 split = frag[abs_pos:] 200 201 # coordinates for the split strand 202 qstart, hstart = qpos, hpos 203 qpos += (len(split) - sum(str(split.query.seq).count(x) 204 for x in ('-', '<', '>'))) * qstep 205 hpos += (len(split) - sum(str(split.hit.seq).count(x) 206 for x in ('-', '<', '>'))) * hstep 207 208 split.hit_start = min(hstart, hpos) 209 split.query_start = min(qstart, qpos) 210 split.hit_end = max(hstart, hpos) 211 split.query_end = max(qstart, qpos) 212 213 # account for frameshift length 214 abs_slice = slice(abs_pos + s_start, abs_pos + s_stop) 215 if len(frag.aln_annotation) == 2: 216 seqs = (str(frag[abs_slice].query.seq), 217 str(frag[abs_slice].hit.seq)) 218 elif len(frag.aln_annotation) == 3: 219 seqs = (frag[abs_slice].aln_annotation['query_annotation'], 220 frag[abs_slice].aln_annotation['hit_annotation'],) 221 if '#' in seqs[0]: 222 qpos += len(shifts) * qstep 223 elif '#' in seqs[1]: 224 hpos += len(shifts) * hstep 225 226 # set frame 227 _set_frame(split) 228 split_frags.append(split) 229 # set similarity string and absolute position for the next loop 230 simil = simil[s_stop:] 231 abs_pos += s_stop 232 233 return split_frags
234 235
236 -def _create_hsp(hid, qid, hspd):
237 """Returns a list of HSP objects from the given parsed HSP values.""" 238 frags = [] 239 # we are iterating over query_ranges, but hit_ranges works just as well 240 for idx, qcoords in enumerate(hspd['query_ranges']): 241 # get sequences, create object 242 hseqlist = hspd.get('hit') 243 hseq = '' if hseqlist is None else hseqlist[idx] 244 qseqlist = hspd.get('query') 245 qseq = '' if qseqlist is None else qseqlist[idx] 246 frag = HSPFragment(hid, qid, hit=hseq, query=qseq) 247 # coordinates 248 frag.query_start = qcoords[0] 249 frag.query_end = qcoords[1] 250 frag.hit_start = hspd['hit_ranges'][idx][0] 251 frag.hit_end = hspd['hit_ranges'][idx][1] 252 # alignment annotation 253 try: 254 aln_annot = hspd.get('aln_annotation', {}) 255 for key, value in aln_annot.items(): 256 frag.aln_annotation[key] = value[idx] 257 except IndexError: 258 pass 259 # strands 260 frag.query_strand = hspd['query_strand'] 261 frag.hit_strand = hspd['hit_strand'] 262 # and append the hsp object to the list 263 if frag.aln_annotation.get('similarity') is not None: 264 if '#' in frag.aln_annotation['similarity']: 265 frags.extend(_split_fragment(frag)) 266 continue 267 # try to set frame if there are translation in the alignment 268 if len(frag.aln_annotation) > 1 or \ 269 frag.query_strand == 0 or \ 270 ('vulgar_comp' in hspd and re.search(_RE_TRANS, hspd['vulgar_comp'])): 271 _set_frame(frag) 272 273 frags.append(frag) 274 275 # if the query is protein, we need to change the hit and query sequences 276 # from three-letter amino acid codes to one letter, and adjust their 277 # coordinates accordingly 278 if len(frags[0].aln_annotation) == 2: # 2 annotations == protein query 279 frags = _adjust_aa_seq(frags) 280 281 hsp = HSP(frags) 282 # set hsp-specific attributes 283 for attr in ('score', 'hit_split_codons', 'query_split_codons', 284 'model', 'vulgar_comp', 'cigar_comp', 'alphabet'): 285 if attr in hspd: 286 setattr(hsp, attr, hspd[attr]) 287 288 return hsp
289 290
291 -def _parse_hit_or_query_line(line):
292 """Parse the 'Query:' line of exonerate alignment outputs.""" 293 try: 294 mark, id, desc = line.split(' ', 2) 295 except ValueError: # no desc 296 mark, id = line.split(' ', 1) 297 desc = '' 298 299 return id, desc
300 301
302 -class _BaseExonerateParser(object):
303 304 """Abstract iterator for exonerate format.""" 305 306 _ALN_MARK = None 307
308 - def __init__(self, handle):
309 self.handle = handle 310 self.has_c4_alignment = False
311
312 - def __iter__(self):
313 # read line until the first alignment block or cigar/vulgar lines 314 while True: 315 self.line = self.handle.readline() 316 # flag for human-readable alignment block 317 if self.line.startswith('C4 Alignment:') and not \ 318 self.has_c4_alignment: 319 self.has_c4_alignment = True 320 if self.line.startswith('C4 Alignment:') or \ 321 self.line.startswith('vulgar:') or \ 322 self.line.startswith('cigar:'): 323 break 324 elif not self.line or self.line.startswith('-- completed '): 325 raise StopIteration 326 327 for qresult in self._parse_qresult(): 328 qresult.program = 'exonerate' 329 # HACK: so that all descriptions are set 330 qresult.description = qresult.description 331 for hit in qresult: 332 hit.description = hit.description 333 yield qresult
334
335 - def read_until(self, bool_func):
336 """Reads the file handle until the given bool function returns True.""" 337 while True: 338 if not self.line or bool_func(self.line): 339 return 340 else: 341 self.line = self.handle.readline()
342
343 - def parse_alignment_block(self, header):
344 raise NotImplementedError("Subclass must implement this")
345
346 - def _parse_alignment_header(self):
347 # read all header lines and store them 348 aln_header = [] 349 # header is everything before the first empty line 350 while self.line.strip(): 351 aln_header.append(self.line.strip()) 352 self.line = self.handle.readline() 353 # then parse them 354 qresult, hit, hsp = {}, {}, {} 355 for line in aln_header: 356 # query line 357 if line.startswith('Query:'): 358 qresult['id'], qresult['description'] = \ 359 _parse_hit_or_query_line(line) 360 # target line 361 elif line.startswith('Target:'): 362 hit['id'], hit['description'] = \ 363 _parse_hit_or_query_line(line) 364 # model line 365 elif line.startswith('Model:'): 366 qresult['model'] = line.split(' ', 1)[1] 367 # score line 368 elif line.startswith('Raw score:'): 369 hsp['score'] = line.split(' ', 2)[2] 370 # query range line 371 elif line.startswith('Query range:'): 372 # line is always 'Query range: \d+ -> \d+', so we can pluck 373 # the numbers directly 374 hsp['query_start'], hsp['query_end'] = line.split(' ', 4)[2:5:2] 375 # hit range line 376 elif line.startswith('Target range:'): 377 # same logic with query range 378 hsp['hit_start'], hsp['hit_end'] = line.split(' ', 4)[2:5:2] 379 380 # determine strand 381 if qresult['description'].endswith(':[revcomp]'): 382 hsp['query_strand'] = '-' 383 qresult['description'] = qresult['description'].replace(':[revcomp]', '') 384 elif 'protein2' in qresult['model']: 385 hsp['query_strand'] = '.' 386 else: 387 hsp['query_strand'] = '+' 388 if hit['description'].endswith(':[revcomp]'): 389 hsp['hit_strand'] = '-' 390 hit['description'] = hit['description'].replace(':[revcomp]', '') 391 elif '2protein' in qresult['model']: 392 hsp['hit_strand'] = '.' 393 else: 394 hsp['hit_strand'] = '+' 395 396 # NOTE: we haven't processed the coordinates types 397 # and the strands are not yet Biopython's standard (1 / -1 / 0) 398 # since it's easier if we do the conversion later 399 400 return {'qresult': qresult, 'hit': hit, 'hsp': hsp}
401
402 - def _parse_qresult(self):
403 # state values 404 state_EOF = 0 405 state_QRES_NEW = 1 406 state_QRES_SAME = 3 407 state_HIT_NEW = 2 408 state_HIT_SAME = 4 409 # initial dummies 410 qres_state, hit_state = None, None 411 file_state = None 412 cur_qid, cur_hid = None, None 413 prev_qid, prev_hid = None, None 414 cur, prev = None, None 415 hit_list, hsp_list = [], [] 416 # if the file has c4 alignments, use that as the alignment mark 417 if self.has_c4_alignment: 418 self._ALN_MARK = 'C4 Alignment:' 419 420 while True: 421 self.read_until(lambda line: line.startswith(self._ALN_MARK)) 422 if cur is not None: 423 prev = cur 424 prev_qid = cur_qid 425 prev_hid = cur_hid 426 # only parse the result row if it's not EOF 427 if self.line: 428 assert self.line.startswith(self._ALN_MARK), self.line 429 # create temp dicts for storing parsed values 430 header = {'qresult': {}, 'hit': {}, 'hsp': {}} 431 # if the file has c4 alignments, try to parse the header 432 if self.has_c4_alignment: 433 self.read_until(lambda line: 434 line.strip().startswith('Query:')) 435 header = self._parse_alignment_header() 436 # parse the block contents 437 cur = self.parse_alignment_block(header) 438 cur_qid = cur['qresult']['id'] 439 cur_hid = cur['hit']['id'] 440 elif not self.line or self.line.startswith('-- completed '): 441 file_state = state_EOF 442 cur_qid, cur_hid = None, None 443 444 # get the state of hit and qresult 445 if prev_qid != cur_qid: 446 qres_state = state_QRES_NEW 447 else: 448 qres_state = state_QRES_SAME 449 # new hits are hits with different ids or hits in a new query 450 if prev_hid != cur_hid or qres_state == state_QRES_NEW: 451 hit_state = state_HIT_NEW 452 else: 453 hit_state = state_HIT_SAME 454 455 if prev is not None: 456 hsp = _create_hsp(prev_hid, prev_qid, prev['hsp']) 457 hsp_list.append(hsp) 458 459 if hit_state == state_HIT_NEW: 460 hit = Hit(hsp_list) 461 for attr, value in prev['hit'].items(): 462 setattr(hit, attr, value) 463 hit_list.append(hit) 464 hsp_list = [] 465 466 if qres_state == state_QRES_NEW or file_state == state_EOF: 467 qresult = QueryResult(id=prev_qid) 468 for hit in hit_list: 469 # not using append since Exonerate may separate the 470 # same hit if it has different strands 471 qresult.absorb(hit) 472 for attr, value in prev['qresult'].items(): 473 setattr(qresult, attr, value) 474 yield qresult 475 if file_state == state_EOF: 476 break 477 hit_list = [] 478 479 # only readline() here if we're not parsing C4 alignments 480 # C4 alignments readline() is handled by its parse_alignment_block 481 # function 482 if not self.has_c4_alignment: 483 self.line = self.handle.readline()
484 485
486 -class _BaseExonerateIndexer(SearchIndexer):
487 488 """Indexer class for Exonerate plain text.""" 489 490 _parser = None # should be defined by subclass 491 _query_mark = None # this one too 492
493 - def get_qresult_id(self, pos):
494 raise NotImplementedError("Should be defined by subclass")
495
496 - def __iter__(self):
497 """Iterates over the file handle; yields key, start offset, and length.""" 498 handle = self._handle 499 handle.seek(0) 500 qresult_key = None 501 502 while True: 503 start_offset = handle.tell() 504 line = handle.readline() 505 if line.startswith(self._query_mark): 506 if qresult_key is None: 507 qresult_key = self.get_qresult_id(start_offset) 508 qresult_offset = start_offset 509 else: 510 curr_key = self.get_qresult_id(start_offset) 511 if curr_key != qresult_key: 512 yield qresult_key, qresult_offset, \ 513 start_offset - qresult_offset 514 qresult_key = curr_key 515 qresult_offset = start_offset 516 handle.seek(qresult_offset) 517 elif not line: 518 yield qresult_key, qresult_offset, \ 519 start_offset - qresult_offset 520 break
521 522 523 # if not used as a module, run the doctest 524 if __name__ == "__main__": 525 from Bio._utils import run_doctest 526 run_doctest() 527