Package Bio :: Package SearchIO :: Package ExonerateIO :: Module _base
[hide private]
[frames] | no frames]

Source Code for Module Bio.SearchIO.ExonerateIO._base

  1  # Copyright 2012 by Wibowo Arindrarto.  All rights reserved. 
  2  # This code is part of the Biopython distribution and governed by its 
  3  # license.  Please see the LICENSE file that should have been included 
  4  # as part of this package. 
  5   
  6  """Bio.SearchIO abstract base parser for Exonerate standard output format.""" 
  7   
  8  import re 
  9  from functools import reduce 
 10   
 11  from Bio.SearchIO._index import SearchIndexer 
 12  from Bio.SearchIO._model import QueryResult, Hit, HSP, HSPFragment 
 13  from Bio.SeqUtils import seq1 
 14   
 15   
 16  # strand char-value mapping 
 17  _STRAND_MAP = {'+': 1, '-': -1, '.': 0} 
 18   
 19  _RE_SHIFTS = re.compile(r'(#+)') 
 20  # regex for checking whether a vulgar line has protein/translated components 
 21  _RE_TRANS = re.compile(r'[53ISCF]') 
 22   
 23   
24 -def _set_frame(frag):
25 """Sets the HSPFragment frames.""" 26 frag.hit_frame = (frag.hit_start % 3 + 1) * frag.hit_strand 27 frag.query_frame = (frag.query_start % 3 + 1) * frag.query_strand
28 29
30 -def _make_triplets(seq, phase=0):
31 """Selects a valid amino acid sequence given a 3-letter code input. 32 33 This function takes a single three-letter amino acid sequence and the phase 34 of the sequence to return the longest intact amino acid sequence possible. 35 Parts of the input sequence before and after the selected sequence are also 36 returned. 37 38 This is an internal private function and is meant for parsing Exonerate's 39 three-letter amino acid output. 40 41 >>> from Bio.SearchIO.ExonerateIO._base import _make_triplets 42 >>> _make_triplets('GlyThrSerAlaPro') 43 ('', ['Gly', 'Thr', 'Ser', 'Ala', 'Pro'], '') 44 >>> _make_triplets('yThSerAla', phase=1) 45 ('y', ['Thr', 'Ser', 'Ala'], '') 46 >>> _make_triplets('yThSerAlaPr', phase=1) 47 ('y', ['Thr', 'Ser', 'Ala'], 'Pr') 48 49 """ 50 pre = seq[:phase] 51 np_seq = seq[phase:] 52 non_triplets = len(np_seq) % 3 53 post = "" if not non_triplets else np_seq[-1 * non_triplets:] 54 intacts = [np_seq[3 * i:3 * (i + 1)] 55 for i in range(len(np_seq) // 3)] 56 return pre, intacts, post
57 58
59 -def _get_fragments_coord(frags):
60 """Returns the letter coordinate of the given list of fragments. 61 62 This function takes a list of three-letter amino acid sequences and 63 returns a list of coordinates for each fragment had all the input 64 sequences been flattened. 65 66 This is an internal private function and is meant for parsing Exonerate's 67 three-letter amino acid output. 68 69 >>> from Bio.SearchIO.ExonerateIO._base import _get_fragments_coord 70 >>> _get_fragments_coord(['Thr', 'Ser', 'Ala']) 71 [0, 3, 6] 72 >>> _get_fragments_coord(['Thr', 'SerAlaPro', 'GlyLeu']) 73 [0, 3, 12, ] 74 >>> _get_fragments_coord(['Thr', 'SerAlaPro', 'GlyLeu', 'Cys']) 75 [0, 3, 12, 18] 76 77 """ 78 if not frags: 79 return [] 80 # first fragment always starts from position 0 81 init = [0] 82 return reduce(lambda acc, frag: acc + [acc[-1] + len(frag)], 83 frags[:-1], init)
84 85
86 -def _get_fragments_phase(frags):
87 """Returns the phases of the given list of 3-letter amino acid fragments. 88 89 This is an internal private function and is meant for parsing Exonerate's 90 three-letter amino acid output. 91 92 >>> from Bio.SearchIO.ExonerateIO._base import _get_fragments_phase 93 >>> _get_fragments_phase(['Thr', 'Ser', 'Ala']) 94 [0, 0, 0] 95 >>> _get_fragments_phase(['ThrSe', 'rAla']) 96 [0, 1] 97 >>> _get_fragments_phase(['ThrSe', 'rAlaLeu', 'ProCys']) 98 [0, 1, 0] 99 >>> _get_fragments_phase(['ThrSe', 'rAlaLeuP', 'roCys']) 100 [0, 1, 2] 101 >>> _get_fragments_phase(['ThrSe', 'rAlaLeuPr', 'oCys']) 102 [0, 1, 1] 103 104 """ 105 return [(3 - (x % 3)) % 3 for x in _get_fragments_coord(frags)]
106 107
108 -def _adjust_aa_seq(fraglist):
109 """Transforms three-letter amino acid codes into one-letters in the 110 given HSPFragments. 111 """ 112 custom_map = {'***': '*', '<->': '-'} 113 hsp_hstart = fraglist[0].hit_start 114 hsp_qstart = fraglist[0].query_start 115 frag_phases = _get_fragments_phase(fraglist) 116 for frag, phase in zip(fraglist, frag_phases): 117 assert frag.query_strand == 0 or frag.hit_strand == 0 118 # hit step may be -1 as we're aligning to DNA 119 hstep = 1 if frag.hit_strand >= 0 else -1 120 121 # set fragment phase 122 frag.phase = phase 123 124 # fragment should have a length that is a multiple of 3 125 # assert len(frag) % 3 == 0 126 qseq = str(frag.query.seq) 127 q_triplets_pre, q_triplets, q_triplets_post = \ 128 _make_triplets(qseq, phase) 129 130 hseq = str(frag.hit.seq) 131 h_triplets_pre, h_triplets, h_triplets_post = \ 132 _make_triplets(hseq, phase) 133 134 # get one letter codes 135 # and replace gap codon markers and termination characters 136 hseq1_pre = "X" if h_triplets_pre else "" 137 hseq1_post = "X" if h_triplets_post else "" 138 hseq1 = seq1("".join(h_triplets), custom_map=custom_map) 139 hstart = hsp_hstart + (len(hseq1_pre) * hstep) 140 hend = hstart + len(hseq1.replace('-', '')) * hstep 141 142 qseq1_pre = "X" if q_triplets_pre else "" 143 qseq1_post = "X" if q_triplets_post else "" 144 qseq1 = seq1("".join(q_triplets), custom_map=custom_map) 145 qstart = hsp_qstart + len(qseq1_pre) 146 qend = qstart + len(qseq1.replace('-', '')) 147 148 # replace the old frag sequences with the new ones 149 frag.hit = None 150 frag.query = None 151 frag.hit = hseq1_pre + hseq1 + hseq1_post 152 frag.query = qseq1_pre + qseq1 + qseq1_post 153 154 # set coordinates for the protein sequence 155 if frag.query_strand == 0: 156 frag.query_start, frag.query_end = qstart, qend 157 elif frag.hit_strand == 0: 158 frag.hit_start, frag.hit_end = hstart, hend 159 160 # update alignment annotation 161 # by turning them into list of triplets 162 for annot, annotseq in frag.aln_annotation.items(): 163 pre, intact, post = _make_triplets(annotseq, phase) 164 frag.aln_annotation[annot] = \ 165 list(filter(None, [pre])) + intact + list(filter(None, [post])) 166 167 # update values for next iteration 168 hsp_hstart, hsp_qstart = hend, qend 169 170 return fraglist
171 172
173 -def _split_fragment(frag):
174 """Splits one HSPFragment containing frame-shifted alignment into two.""" 175 # given an HSPFragment object with frameshift(s), this method splits it 176 # into fragments without frameshifts by sequentially chopping it off 177 # starting from the beginning 178 simil = frag.aln_annotation['similarity'] 179 # we should have at least 1 frame shift for splitting 180 assert simil.count('#') > 0 181 182 split_frags = [] 183 qstep = 1 if frag.query_strand >= 0 else -1 184 hstep = 1 if frag.hit_strand >= 0 else -1 185 qpos = min(frag.query_range) if qstep >= 0 else max(frag.query_range) 186 hpos = min(frag.hit_range) if qstep >= 0 else max(frag.hit_range) 187 abs_pos = 0 188 # split according to hit, then query 189 while simil: 190 191 try: 192 shifts = re.search(_RE_SHIFTS, simil).group(1) 193 s_start = simil.find(shifts) 194 s_stop = s_start + len(shifts) 195 split = frag[abs_pos:abs_pos + s_start] 196 except AttributeError: # no '#' in simil, i.e. last frag 197 shifts = '' 198 s_start = 0 199 s_stop = len(simil) 200 split = frag[abs_pos:] 201 202 # coordinates for the split strand 203 qstart, hstart = qpos, hpos 204 qpos += (len(split) - sum(str(split.query.seq).count(x) 205 for x in ('-', '<', '>'))) * qstep 206 hpos += (len(split) - sum(str(split.hit.seq).count(x) 207 for x in ('-', '<', '>'))) * hstep 208 209 split.hit_start = min(hstart, hpos) 210 split.query_start = min(qstart, qpos) 211 split.hit_end = max(hstart, hpos) 212 split.query_end = max(qstart, qpos) 213 214 # account for frameshift length 215 abs_slice = slice(abs_pos + s_start, abs_pos + s_stop) 216 if len(frag.aln_annotation) == 2: 217 seqs = (str(frag[abs_slice].query.seq), 218 str(frag[abs_slice].hit.seq)) 219 elif len(frag.aln_annotation) == 3: 220 seqs = (frag[abs_slice].aln_annotation['query_annotation'], 221 frag[abs_slice].aln_annotation['hit_annotation'],) 222 if '#' in seqs[0]: 223 qpos += len(shifts) * qstep 224 elif '#' in seqs[1]: 225 hpos += len(shifts) * hstep 226 227 # set frame 228 _set_frame(split) 229 split_frags.append(split) 230 # set similarity string and absolute position for the next loop 231 simil = simil[s_stop:] 232 abs_pos += s_stop 233 234 return split_frags
235 236
237 -def _create_hsp(hid, qid, hspd):
238 """Returns a list of HSP objects from the given parsed HSP values.""" 239 frags = [] 240 # we are iterating over query_ranges, but hit_ranges works just as well 241 for idx, qcoords in enumerate(hspd['query_ranges']): 242 # get sequences, create object 243 hseqlist = hspd.get('hit') 244 hseq = '' if hseqlist is None else hseqlist[idx] 245 qseqlist = hspd.get('query') 246 qseq = '' if qseqlist is None else qseqlist[idx] 247 frag = HSPFragment(hid, qid, hit=hseq, query=qseq) 248 # coordinates 249 frag.query_start = qcoords[0] 250 frag.query_end = qcoords[1] 251 frag.hit_start = hspd['hit_ranges'][idx][0] 252 frag.hit_end = hspd['hit_ranges'][idx][1] 253 # alignment annotation 254 try: 255 aln_annot = hspd.get('aln_annotation', {}) 256 for key, value in aln_annot.items(): 257 frag.aln_annotation[key] = value[idx] 258 except IndexError: 259 pass 260 # strands 261 frag.query_strand = hspd['query_strand'] 262 frag.hit_strand = hspd['hit_strand'] 263 # and append the hsp object to the list 264 if frag.aln_annotation.get('similarity') is not None: 265 if '#' in frag.aln_annotation['similarity']: 266 frags.extend(_split_fragment(frag)) 267 continue 268 # try to set frame if there are translation in the alignment 269 if len(frag.aln_annotation) > 1 or \ 270 frag.query_strand == 0 or \ 271 ('vulgar_comp' in hspd and re.search(_RE_TRANS, hspd['vulgar_comp'])): 272 _set_frame(frag) 273 274 frags.append(frag) 275 276 # if the query is protein, we need to change the hit and query sequences 277 # from three-letter amino acid codes to one letter, and adjust their 278 # coordinates accordingly 279 if len(frags[0].aln_annotation) == 2: # 2 annotations == protein query 280 frags = _adjust_aa_seq(frags) 281 282 hsp = HSP(frags) 283 # set hsp-specific attributes 284 for attr in ('score', 'hit_split_codons', 'query_split_codons', 285 'model', 'vulgar_comp', 'cigar_comp', 'alphabet'): 286 if attr in hspd: 287 setattr(hsp, attr, hspd[attr]) 288 289 return hsp
290 291
292 -def _parse_hit_or_query_line(line):
293 """Parse the 'Query:' line of exonerate alignment outputs.""" 294 try: 295 mark, id, desc = line.split(' ', 2) 296 except ValueError: # no desc 297 mark, id = line.split(' ', 1) 298 desc = '' 299 300 return id, desc
301 302
303 -class _BaseExonerateParser(object):
304 """Abstract iterator for exonerate format.""" 305 306 _ALN_MARK = None 307
308 - def __init__(self, handle):
309 self.handle = handle 310 self.has_c4_alignment = False
311
312 - def __iter__(self):
313 # read line until the first alignment block or cigar/vulgar lines 314 while True: 315 self.line = self.handle.readline() 316 # flag for human-readable alignment block 317 if self.line.startswith('C4 Alignment:') and not \ 318 self.has_c4_alignment: 319 self.has_c4_alignment = True 320 if self.line.startswith('C4 Alignment:') or \ 321 self.line.startswith('vulgar:') or \ 322 self.line.startswith('cigar:'): 323 break 324 elif not self.line or self.line.startswith('-- completed '): 325 raise StopIteration 326 327 for qresult in self._parse_qresult(): 328 qresult.program = 'exonerate' 329 # HACK: so that all descriptions are set 330 qresult.description = qresult.description 331 for hit in qresult: 332 hit.description = hit.description 333 yield qresult
334
335 - def read_until(self, bool_func):
336 """Reads the file handle until the given bool function returns True.""" 337 while True: 338 if not self.line or bool_func(self.line): 339 return 340 else: 341 self.line = self.handle.readline()
342
343 - def parse_alignment_block(self, header):
344 raise NotImplementedError("Subclass must implement this")
345
346 - def _parse_alignment_header(self):
347 # read all header lines and store them 348 aln_header = [] 349 # header is everything before the first empty line 350 while self.line.strip(): 351 aln_header.append(self.line.strip()) 352 self.line = self.handle.readline() 353 # then parse them 354 qresult, hit, hsp = {}, {}, {} 355 for line in aln_header: 356 # query line 357 if line.startswith('Query:'): 358 qresult['id'], qresult['description'] = \ 359 _parse_hit_or_query_line(line) 360 # target line 361 elif line.startswith('Target:'): 362 hit['id'], hit['description'] = \ 363 _parse_hit_or_query_line(line) 364 # model line 365 elif line.startswith('Model:'): 366 qresult['model'] = line.split(' ', 1)[1] 367 # score line 368 elif line.startswith('Raw score:'): 369 hsp['score'] = line.split(' ', 2)[2] 370 # query range line 371 elif line.startswith('Query range:'): 372 # line is always 'Query range: \d+ -> \d+', so we can pluck 373 # the numbers directly 374 hsp['query_start'], hsp['query_end'] = line.split(' ', 4)[2:5:2] 375 # hit range line 376 elif line.startswith('Target range:'): 377 # same logic with query range 378 hsp['hit_start'], hsp['hit_end'] = line.split(' ', 4)[2:5:2] 379 380 # determine strand 381 if qresult['description'].endswith(':[revcomp]'): 382 hsp['query_strand'] = '-' 383 qresult['description'] = qresult['description'].replace(':[revcomp]', '') 384 elif 'protein2' in qresult['model']: 385 hsp['query_strand'] = '.' 386 else: 387 hsp['query_strand'] = '+' 388 if hit['description'].endswith(':[revcomp]'): 389 hsp['hit_strand'] = '-' 390 hit['description'] = hit['description'].replace(':[revcomp]', '') 391 elif '2protein' in qresult['model']: 392 hsp['hit_strand'] = '.' 393 else: 394 hsp['hit_strand'] = '+' 395 396 # NOTE: we haven't processed the coordinates types 397 # and the strands are not yet Biopython's standard (1 / -1 / 0) 398 # since it's easier if we do the conversion later 399 400 return {'qresult': qresult, 'hit': hit, 'hsp': hsp}
401
402 - def _parse_qresult(self):
403 # state values 404 state_EOF = 0 405 state_QRES_NEW = 1 406 state_QRES_SAME = 3 407 state_HIT_NEW = 2 408 state_HIT_SAME = 4 409 # initial dummies 410 qres_state, hit_state = None, None 411 file_state = None 412 cur_qid, cur_hid = None, None 413 prev_qid, prev_hid = None, None 414 cur, prev = None, None 415 hit_list, hsp_list = [], [] 416 # if the file has c4 alignments, use that as the alignment mark 417 if self.has_c4_alignment: 418 self._ALN_MARK = 'C4 Alignment:' 419 420 while True: 421 self.read_until(lambda line: line.startswith(self._ALN_MARK)) 422 if cur is not None: 423 prev = cur 424 prev_qid = cur_qid 425 prev_hid = cur_hid 426 # only parse the result row if it's not EOF 427 if self.line: 428 assert self.line.startswith(self._ALN_MARK), self.line 429 # create temp dicts for storing parsed values 430 header = {'qresult': {}, 'hit': {}, 'hsp': {}} 431 # if the file has c4 alignments, try to parse the header 432 if self.has_c4_alignment: 433 self.read_until(lambda line: 434 line.strip().startswith('Query:')) 435 header = self._parse_alignment_header() 436 # parse the block contents 437 cur = self.parse_alignment_block(header) 438 cur_qid = cur['qresult']['id'] 439 cur_hid = cur['hit']['id'] 440 elif not self.line or self.line.startswith('-- completed '): 441 file_state = state_EOF 442 cur_qid, cur_hid = None, None 443 444 # get the state of hit and qresult 445 if prev_qid != cur_qid: 446 qres_state = state_QRES_NEW 447 else: 448 qres_state = state_QRES_SAME 449 # new hits are hits with different ids or hits in a new query 450 if prev_hid != cur_hid or qres_state == state_QRES_NEW: 451 hit_state = state_HIT_NEW 452 else: 453 hit_state = state_HIT_SAME 454 455 if prev is not None: 456 hsp = _create_hsp(prev_hid, prev_qid, prev['hsp']) 457 hsp_list.append(hsp) 458 459 if hit_state == state_HIT_NEW: 460 hit = Hit(hsp_list) 461 for attr, value in prev['hit'].items(): 462 setattr(hit, attr, value) 463 hit_list.append(hit) 464 hsp_list = [] 465 466 if qres_state == state_QRES_NEW or file_state == state_EOF: 467 qresult = QueryResult(id=prev_qid) 468 for hit in hit_list: 469 # not using append since Exonerate may separate the 470 # same hit if it has different strands 471 qresult.absorb(hit) 472 for attr, value in prev['qresult'].items(): 473 setattr(qresult, attr, value) 474 yield qresult 475 if file_state == state_EOF: 476 break 477 hit_list = [] 478 479 # only readline() here if we're not parsing C4 alignments 480 # C4 alignments readline() is handled by its parse_alignment_block 481 # function 482 if not self.has_c4_alignment: 483 self.line = self.handle.readline()
484 485
486 -class _BaseExonerateIndexer(SearchIndexer):
487 """Indexer class for Exonerate plain text.""" 488 489 _parser = None # should be defined by subclass 490 _query_mark = None # this one too 491
492 - def get_qresult_id(self, pos):
493 raise NotImplementedError("Should be defined by subclass")
494
495 - def __iter__(self):
496 """Iterates over the file handle; yields key, start offset, and length.""" 497 handle = self._handle 498 handle.seek(0) 499 qresult_key = None 500 501 while True: 502 start_offset = handle.tell() 503 line = handle.readline() 504 if line.startswith(self._query_mark): 505 if qresult_key is None: 506 qresult_key = self.get_qresult_id(start_offset) 507 qresult_offset = start_offset 508 else: 509 curr_key = self.get_qresult_id(start_offset) 510 if curr_key != qresult_key: 511 yield qresult_key, qresult_offset, \ 512 start_offset - qresult_offset 513 qresult_key = curr_key 514 qresult_offset = start_offset 515 handle.seek(qresult_offset) 516 elif not line: 517 yield qresult_key, qresult_offset, \ 518 start_offset - qresult_offset 519 break
520 521 522 # if not used as a module, run the doctest 523 if __name__ == "__main__": 524 from Bio._utils import run_doctest 525 run_doctest() 526