Package Bio :: Package SearchIO :: Package ExonerateIO :: Module exonerate_text
[hide private]
[frames] | no frames]

Source Code for Module Bio.SearchIO.ExonerateIO.exonerate_text

  1  # Copyright 2012 by Wibowo Arindrarto.  All rights reserved. 
  2  # This code is part of the Biopython distribution and governed by its 
  3  # license.  Please see the LICENSE file that should have been included 
  4  # as part of this package. 
  5   
  6  """Bio.SearchIO parser for Exonerate plain text output format.""" 
  7   
  8  import re 
  9  from itertools import chain 
 10   
 11  from Bio._py3k import _as_bytes, _bytes_to_string 
 12  from Bio._py3k import zip 
 13   
 14   
 15  from Bio.Alphabet import generic_protein 
 16   
 17  from ._base import _BaseExonerateParser, _BaseExonerateIndexer, _STRAND_MAP, \ 
 18          _parse_hit_or_query_line 
 19  from .exonerate_vulgar import parse_vulgar_comp, _RE_VULGAR 
 20   
 21   
 22  __all__ = ['ExonerateTextParser', 'ExonerateTextIndexer'] 
 23   
 24   
 25  # for capturing sequences in alignment blocks 
 26  # e.g. ' 529 : ATCCCTTATCTCTTTATCTTGTA :    472' 
 27  _RE_ALN_ROW = re.compile(r'\s*\d+\s+: (.*) :\s+\d+') 
 28  # for splitting the line based on intron annotations 
 29  # e.g. '  >>>> Target Intron 1 >>>>  ' or 'gt.........................ag' 
 30  _RE_EXON = re.compile(r'[atgc ]{2,}?(?:(?:[<>]+ \w+ Intron \d+ [<>]+)|(?:\.+))[atgc ]{2,}?') 
 31  # captures the intron length 
 32  # from e.g. '61 bp // 154295 bp' (joint intron lengths) or '177446 bp' 
 33  _RE_EXON_LEN = re.compile(r'(?:(\d+) bp // (\d+) bp)|(?:(\d+) bp)') 
 34  # for splitting lines in the NER model 
 35  _RE_NER = re.compile(r'--<\s+\d+\s+>--') 
 36  # for capturing NER gap lengths 
 37  _RE_NER_LEN = re.compile(r'--<\s+(\d+)\s+>--') 
 38  # regexes for capturing the letters inside curly braces 
 39  # no. of letters is either 1 or 2, since they are split codons 
 40  _RE_SCODON_START = re.compile(r'\{(\w{1,2})\}$') 
 41  _RE_SCODON_END = re.compile(r'^\{(\w{1,2})\}') 
 42   
 43   
44 -def _flip_codons(codon_seq, target_seq):
45 """Flips the codon characters from one seq to another.""" 46 a, b = '', '' 47 for char1, char2 in zip(codon_seq, target_seq): 48 # no need to do anything if the codon seq line has nothing 49 if char1 == ' ': 50 a += char1 51 b += char2 52 else: 53 a += char2 54 b += char1 55 56 return a, b
57 58
59 -def _get_block_coords(parsed_seq, row_dict, has_ner=False):
60 """Returns a list of start, end coordinates for each given block in the sequence.""" 61 start = 0 62 coords = [] 63 if not has_ner: 64 splitter = _RE_EXON 65 else: 66 splitter = _RE_NER 67 68 # use the query line for reference 69 seq = parsed_seq[row_dict['query']] 70 71 for block in re.split(splitter, seq): 72 start += seq[start:].find(block) 73 end = start + len(block) 74 coords.append((start, end)) 75 76 return coords
77 78
79 -def _get_inter_coords(coords, strand=1):
80 """From the given pairs of coordinates, returns a list of pairs 81 covering the intervening ranges.""" 82 # adapted from Python's itertools guide 83 # if strand is -1, adjust coords to the ends and starts are chained 84 if strand == -1: 85 sorted_coords = [(max(a, b), min(a, b)) for a, b in coords] 86 inter_coords = list(chain(*sorted_coords))[1:-1] 87 return list(zip(inter_coords[1::2], inter_coords[::2])) 88 else: 89 inter_coords = list(chain(*coords))[1:-1] 90 return list(zip(inter_coords[::2], inter_coords[1::2]))
91 92
93 -def _stitch_rows(raw_rows):
94 """Stitches together the parsed alignment rows and returns them in a list.""" 95 # deal with possible codon surprise! 96 # (i.e. alignments with codons using cdna2genome model) 97 # by creating additional rows to contain the codons 98 try: 99 max_len = max(len(x) for x in raw_rows) 100 for row in raw_rows: 101 assert len(row) == max_len 102 except AssertionError: 103 for idx, row in enumerate(raw_rows): 104 if len(row) != max_len: 105 # codons must be present in the query and hit (so +2) 106 assert len(row) + 2 == max_len 107 # add additional empty lines to contain codons 108 raw_rows[idx] = [' ' * len(row[0])] + row + [' ' * len(row[0])] 109 110 cmbn_rows = [] 111 for idx, row in enumerate(raw_rows[0]): 112 cmbn_row = ''.join(aln_row[idx] for aln_row in raw_rows) 113 cmbn_rows.append(cmbn_row) 114 115 # the real aligned sequence is always the 'outer' one, so we want 116 # to flip them with their 'inner' pairs 117 if len(cmbn_rows) == 5: 118 # flip query sequence 119 cmbn_rows[0], cmbn_rows[1] = \ 120 _flip_codons(cmbn_rows[0], cmbn_rows[1]) 121 # flip hit sequence 122 cmbn_rows[4], cmbn_rows[3] = \ 123 _flip_codons(cmbn_rows[4], cmbn_rows[3]) 124 125 return cmbn_rows
126 127
128 -def _get_row_dict(row_len):
129 """Returns a dictionary of row indices for parsing alignment blocks.""" 130 idx = {} 131 # 3 lines, usually in dna vs dna models 132 if row_len == 3: 133 idx['query'] = 0 134 idx['midline'] = 1 135 idx['hit'] = 2 136 idx['qannot'], idx['hannot'] = None, None 137 # 4 lines, in protein vs dna models 138 elif row_len == 4: 139 idx['query'] = 0 140 idx['midline'] = 1 141 idx['hit'] = 2 142 idx['hannot'] = 3 143 idx['qannot'] = None 144 # 5 lines, translated dna vs translated dna 145 elif row_len == 5: 146 # set sequence indexes 147 idx['qannot'] = 0 148 idx['query'] = 1 149 idx['midline'] = 2 150 idx['hit'] = 3 151 idx['hannot'] = 4 152 else: 153 raise ValueError("Unexpected row count in alignment block: " 154 "%i" % row_len) 155 return idx
156 157
158 -def _get_blocks(rows, coords, idx):
159 """Returns a list of dictionaries of sequences split by the coordinates.""" 160 for idx_name in ('query', 'hit', 'midline', 'qannot', 'hannot'): 161 assert idx_name in idx 162 blocks = [] 163 for start, end in coords: 164 block = {} 165 # get seqs according to index 166 block['query'] = rows[idx['query']][start:end] 167 block['hit'] = rows[idx['hit']][start:end] 168 block['homology'] = rows[idx['midline']][start:end] 169 if idx['qannot'] is not None: 170 block['query_annotation'] = rows[idx['qannot']][start:end] 171 if idx['hannot'] is not None: 172 block['hit_annotation'] = rows[idx['hannot']][start:end] 173 blocks.append(block) 174 175 return blocks
176 177
178 -def _get_scodon_moves(tmp_seq_blocks):
179 """Returns a dictionary of split codon locations relative to each 180 fragment's end""" 181 scodon_moves = {'query': [], 'hit': []} 182 for seq_type in scodon_moves: 183 scoords = [] 184 for block in tmp_seq_blocks: 185 # check both ends of the sequence for residues in curly braces 186 m_start = re.search(_RE_SCODON_START, block[seq_type]) 187 m_end = re.search(_RE_SCODON_END, block[seq_type]) 188 if m_start: 189 m_start = len(m_start.group(1)) 190 scoords.append((m_start, 0)) 191 else: 192 scoords.append((0, 0)) 193 if m_end: 194 m_end = len(m_end.group(1)) 195 scoords.append((0, m_end)) 196 else: 197 scoords.append((0, 0)) 198 scodon_moves[seq_type] = scoords 199 200 return scodon_moves
201 202
203 -def _clean_blocks(tmp_seq_blocks):
204 """Removes curly braces (split codon markers) from the given sequences.""" 205 seq_blocks = [] 206 for seq_block in tmp_seq_blocks: 207 for line_name in seq_block: 208 seq_block[line_name] = \ 209 seq_block[line_name].replace('{', '').replace('}', '') 210 seq_blocks.append(seq_block) 211 212 return seq_blocks
213 214
215 -def _comp_intron_lens(seq_type, inter_blocks, raw_inter_lens):
216 """Returns the length of introns between fragments.""" 217 # set opposite type, for setting introns 218 opp_type = 'hit' if seq_type == 'query' else 'query' 219 # list of flags to denote if an intron follows a block 220 # it reads e.g. this line: 221 # "ATGTT{TT} >>>> Target Intron 1 >>>> {G}TGTGTGTACATT" 222 # and sets the opposing sequence type's intron (since this 223 # line is present on the opposite sequence type line) 224 has_intron_after = ['Intron' in x[seq_type] for x in 225 inter_blocks] 226 assert len(has_intron_after) == len(raw_inter_lens) 227 # create list containing coord adjustments incorporating 228 # intron lengths 229 inter_lens = [] 230 for flag, parsed_len in zip(has_intron_after, raw_inter_lens): 231 if flag: 232 # joint introns 233 if all(parsed_len[:2]): 234 # intron len is [0] if opp_type is query, otherwise it's [1] 235 intron_len = int(parsed_len[0]) if opp_type == 'query' \ 236 else int(parsed_len[1]) 237 # single hit/query introns 238 elif parsed_len[2]: 239 intron_len = int(parsed_len[2]) 240 else: 241 raise ValueError("Unexpected intron parsing " 242 "result: %r" % parsed_len) 243 else: 244 intron_len = 0 245 246 inter_lens.append(intron_len) 247 248 return inter_lens
249 250
251 -def _comp_coords(hsp, seq_type, inter_lens):
252 """Fill the block coordinates of the given hsp dictionary.""" 253 assert seq_type in ('hit', 'query') 254 # manually fill the first coord 255 seq_step = 1 if hsp['%s_strand' % seq_type] >= 0 else -1 256 fstart = hsp['%s_start' % seq_type] 257 # fend is fstart + number of residues in the sequence, minus gaps 258 fend = fstart + len( 259 hsp[seq_type][0].replace('-', '').replace('>', 260 '').replace('<', '')) * seq_step 261 coords = [(fstart, fend)] 262 # and start from the second block, after the first inter seq 263 for idx, block in enumerate(hsp[seq_type][1:]): 264 bstart = coords[-1][1] + inter_lens[idx] * seq_step 265 bend = bstart + seq_step * \ 266 len(block.replace('-', '')) 267 coords.append((bstart, bend)) 268 269 # adjust the coords so the smallest is [0], if strand is -1 270 # couldn't do this in the previous steps since we need the initial 271 # block ordering 272 if seq_step != 1: 273 for idx, coord in enumerate(coords): 274 coords[idx] = coords[idx][1], coords[idx][0] 275 276 return coords
277 278
279 -def _comp_split_codons(hsp, seq_type, scodon_moves):
280 """Computes the positions of split codons and puts the values in the given 281 HSP dictionary.""" 282 scodons = [] 283 for idx in range(len(scodon_moves[seq_type])): 284 pair = scodon_moves[seq_type][idx] 285 if not any(pair): 286 continue 287 else: 288 assert not all(pair) 289 a, b = pair 290 anchor_pair = hsp['%s_ranges' % seq_type][idx // 2] 291 strand = 1 if hsp['%s_strand' % seq_type] >= 0 else -1 292 293 if a: 294 func = max if strand == 1 else min 295 anchor = func(anchor_pair) 296 start_c, end_c = anchor + a * strand * -1, anchor 297 elif b: 298 func = min if strand == 1 else max 299 anchor = func(anchor_pair) 300 start_c, end_c = anchor + b * strand, anchor 301 scodons.append((min(start_c, end_c), max(start_c, end_c))) 302 303 return scodons
304 305
306 -class ExonerateTextParser(_BaseExonerateParser):
307 308 """Parser for Exonerate plain text output.""" 309 310 _ALN_MARK = 'C4 Alignment:' 311
312 - def parse_alignment_block(self, header):
313 qresult = header['qresult'] 314 hit = header['hit'] 315 hsp = header['hsp'] 316 # check for values that must have been set by previous methods 317 for val_name in ('query_start', 'query_end', 'hit_start', 'hit_end', 318 'query_strand', 'hit_strand'): 319 assert val_name in hsp, hsp 320 321 # get the alignment rows 322 # and stitch them so we have the full sequences in single strings 323 raw_aln_blocks, vulgar_comp = self._read_alignment() 324 # cmbn_rows still has split codon markers (curly braces) 325 cmbn_rows = _stitch_rows(raw_aln_blocks) 326 row_dict = _get_row_dict(len(cmbn_rows)) 327 # get the sequence blocks 328 has_ner = 'NER' in qresult['model'].upper() 329 seq_coords = _get_block_coords(cmbn_rows, row_dict, has_ner) 330 tmp_seq_blocks = _get_blocks(cmbn_rows, seq_coords, row_dict) 331 # get split codon temp coords for later use 332 # this result in pairs of base movement for both ends of each row 333 scodon_moves = _get_scodon_moves(tmp_seq_blocks) 334 # remove the split codon markers 335 seq_blocks = _clean_blocks(tmp_seq_blocks) 336 337 # adjust strands 338 hsp['query_strand'] = _STRAND_MAP[hsp['query_strand']] 339 hsp['hit_strand'] = _STRAND_MAP[hsp['hit_strand']] 340 # cast coords into ints 341 hsp['query_start'] = int(hsp['query_start']) 342 hsp['query_end'] = int(hsp['query_end']) 343 hsp['hit_start'] = int(hsp['hit_start']) 344 hsp['hit_end'] = int(hsp['hit_end']) 345 # cast score into ints 346 hsp['score'] = int(hsp['score']) 347 # set sequences 348 hsp['query'] = [x['query'] for x in seq_blocks] 349 hsp['hit'] = [x['hit'] for x in seq_blocks] 350 hsp['aln_annotation'] = {} 351 # set the alphabet 352 # currently only limited to models with protein queries 353 if 'protein2' in qresult['model'] or 'coding2' in qresult['model']: 354 hsp['alphabet'] = generic_protein 355 # get the annotations if they exist 356 for annot_type in ('homology', 'query_annotation', 'hit_annotation'): 357 try: 358 hsp['aln_annotation'][annot_type] = \ 359 [x[annot_type] for x in seq_blocks] 360 except KeyError: 361 pass 362 363 # use vulgar coordinates if vulgar line is present and return 364 #if vulgar_comp is not None: 365 # hsp = parse_vulgar_comp(hsp, vulgar_comp) 366 367 # return {'qresult': qresult, 'hit': hit, 'hsp': hsp} 368 369 # otherwise we need to get the coordinates from the alignment 370 # get the intervening blocks first, so we can use them 371 # to adjust the coordinates 372 if not has_ner: 373 # get intervening coordinates and blocks, only if model is not ner 374 # ner models have a much more simple coordinate calculation 375 inter_coords = _get_inter_coords(seq_coords) 376 inter_blocks = _get_blocks(cmbn_rows, inter_coords, row_dict) 377 # returns a three-component tuple of intron lengths 378 # first two component filled == intron in hit and query 379 # last component filled == intron in hit or query 380 raw_inter_lens = re.findall(_RE_EXON_LEN, 381 cmbn_rows[row_dict['midline']]) 382 383 # compute start and end coords for each block 384 for seq_type in ('query', 'hit'): 385 386 # ner blocks and intron blocks require different adjustments 387 if not has_ner: 388 opp_type = 'hit' if seq_type == 'query' else 'query' 389 inter_lens = _comp_intron_lens(seq_type, inter_blocks, 390 raw_inter_lens) 391 else: 392 # for NER blocks, the length of the inter-fragment gaps is 393 # written on the same strand, so opp_type is seq_type 394 opp_type = seq_type 395 inter_lens = [int(x) for x in 396 re.findall(_RE_NER_LEN, cmbn_rows[row_dict[seq_type]])] 397 398 # check that inter_lens's length is len opp_type block - 1 399 assert len(inter_lens) == len(hsp[opp_type])-1, \ 400 "%r vs %r" % (len(inter_lens), len(hsp[opp_type])-1) 401 # fill the hsp query and hit coordinates 402 hsp['%s_ranges' % opp_type] = \ 403 _comp_coords(hsp, opp_type, inter_lens) 404 # and fill the split codon coordinates, if model != ner 405 # can't do this in the if-else clause above since we need to 406 # compute the ranges first 407 if not has_ner: 408 hsp['%s_split_codons' % opp_type] = \ 409 _comp_split_codons(hsp, opp_type, scodon_moves) 410 411 # now that we've finished parsing coords, we can set the hit and start 412 # coord according to Biopython's convention (start <= end) 413 for seq_type in ('query', 'hit'): 414 if hsp['%s_strand' % seq_type] == -1: 415 n_start = '%s_start' % seq_type 416 n_end = '%s_end' % seq_type 417 hsp[n_start], hsp[n_end] = hsp[n_end], hsp[n_start] 418 419 return {'qresult': qresult, 'hit': hit, 'hsp': hsp}
420
421 - def _read_alignment(self):
422 """Reads the raw alignment block strings, returns them in a list.""" 423 raw_aln_blocks = [] 424 # flag to check whether we're in an aligment row 425 in_aln_row = False 426 # flag for vulgar line, if present, we can parse coordinates from it 427 vulgar_comp = None 428 while True: 429 430 match = re.search(_RE_ALN_ROW, self.line.strip()) 431 # if we have a match, set flags and values 432 if match and not in_aln_row: 433 start_idx = self.line.index(match.group(1)) 434 row_len = len(match.group(1)) 435 in_aln_row = True 436 raw_aln_block = [] 437 # if we're in an alignment row, grab the sequence 438 if in_aln_row: 439 raw_aln_block.append(self.line[start_idx:start_idx+row_len]) 440 # reset flags and values if the line matches, we're in an alignment 441 # row, and there are more than 1 line in rows 442 if match and in_aln_row and len(raw_aln_block) > 1: 443 raw_aln_blocks.append(raw_aln_block) 444 start_idx = None 445 row_len = None 446 in_aln_row = False 447 448 self.line = self.handle.readline() 449 # try to parse vulgar line if present 450 if self.line.startswith('vulgar'): 451 vulgar = re.search(_RE_VULGAR, self.line) 452 vulgar_comp = vulgar.group(10) 453 if not self.line or self.line.startswith(self._ALN_MARK): 454 # HACK: this is so that the parse_qresult method does not 455 # yield the objects before appending the last HSP. We are doing 456 # this to keep the parser compatible with outputs without 457 # human-readable alignment outputs. This also relies on the 458 # fact that repeated readline() always returns '' on EOF. 459 if not self.line: 460 self.line = 'mock' 461 break 462 463 return raw_aln_blocks, vulgar_comp
464 465
466 -class ExonerateTextIndexer(_BaseExonerateIndexer):
467 468 """Indexer class for Exonerate plain text.""" 469 470 _parser = ExonerateTextParser 471 _query_mark = _as_bytes('C4 Alignment') 472
473 - def get_qresult_id(self, pos):
474 """Returns the query ID from the nearest "Query:" line.""" 475 handle = self._handle 476 handle.seek(pos) 477 sentinel = _as_bytes('Query:') 478 479 while True: 480 line = handle.readline().strip() 481 if line.startswith(sentinel): 482 break 483 if not line: 484 raise StopIteration 485 qid, desc = _parse_hit_or_query_line(_bytes_to_string(line)) 486 487 return qid
488
489 - def get_raw(self, offset):
490 """Returns the raw string of a QueryResult object from the given offset.""" 491 handle = self._handle 492 handle.seek(offset) 493 qresult_key = None 494 qresult_raw = _as_bytes('') 495 496 while True: 497 line = handle.readline() 498 if not line: 499 break 500 elif line.startswith(self._query_mark): 501 cur_pos = handle.tell() 502 if qresult_key is None: 503 qresult_key = self.get_qresult_id(cur_pos) 504 else: 505 curr_key = self.get_qresult_id(cur_pos) 506 if curr_key != qresult_key: 507 break 508 handle.seek(cur_pos) 509 qresult_raw += line 510 511 return qresult_raw
512 513 514 # if not used as a module, run the doctest 515 if __name__ == "__main__": 516 from Bio._utils import run_doctest 517 run_doctest() 518