Package Bio :: Package SearchIO :: Package ExonerateIO :: Module exonerate_text
[hide private]
[frames] | no frames]

Source Code for Module Bio.SearchIO.ExonerateIO.exonerate_text

  1  # Copyright 2012 by Wibowo Arindrarto.  All rights reserved. 
  2  # This code is part of the Biopython distribution and governed by its 
  3  # license.  Please see the LICENSE file that should have been included 
  4  # as part of this package. 
  5   
  6  """Bio.SearchIO parser for Exonerate plain text output format.""" 
  7   
  8  import re 
  9  from itertools import chain 
 10   
 11  from Bio._py3k import _bytes_to_string 
 12  from Bio._py3k import zip 
 13   
 14   
 15  from Bio.Alphabet import generic_protein 
 16   
 17  from ._base import _BaseExonerateParser, _BaseExonerateIndexer, _STRAND_MAP, \ 
 18          _parse_hit_or_query_line 
 19  from .exonerate_vulgar import _RE_VULGAR 
 20   
 21   
 22  __all__ = ('ExonerateTextParser', 'ExonerateTextIndexer') 
 23   
 24   
 25  # for capturing sequences in alignment blocks 
 26  # e.g. ' 529 : ATCCCTTATCTCTTTATCTTGTA :    472' 
 27  _RE_ALN_ROW = re.compile(r'\s*\d+\s+: (.*) :\s+\d+') 
 28  # for splitting the line based on intron annotations 
 29  # e.g. '  >>>> Target Intron 1 >>>>  ' or 'gt.........................ag' 
 30  _RE_EXON = re.compile(r'[atgc ]{2,}?(?:(?:[<>]+ \w+ Intron \d+ [<>]+)|(?:\.+))[atgc ]{2,}?') 
 31  # captures the intron length 
 32  # from e.g. '61 bp // 154295 bp' (joint intron lengths) or '177446 bp' 
 33  _RE_EXON_LEN = re.compile(r'(?:(\d+) bp // (\d+) bp)|(?:(\d+) bp)') 
 34  # for splitting lines in the NER model 
 35  _RE_NER = re.compile(r'--<\s+\d+\s+>--') 
 36  # for capturing NER gap lengths 
 37  _RE_NER_LEN = re.compile(r'--<\s+(\d+)\s+>--') 
 38  # regexes for capturing the letters inside curly braces 
 39  # no. of letters is either 1 or 2, since they are split codons 
 40  _RE_SCODON_START = re.compile(r'\{(\w{1,2})\}$') 
 41  _RE_SCODON_END = re.compile(r'^\{(\w{1,2})\}') 
 42   
 43   
44 -def _flip_codons(codon_seq, target_seq):
45 """Flips the codon characters from one seq to another.""" 46 a, b = '', '' 47 for char1, char2 in zip(codon_seq, target_seq): 48 # no need to do anything if the codon seq line has nothing 49 if char1 == ' ': 50 a += char1 51 b += char2 52 else: 53 a += char2 54 b += char1 55 56 return a, b
57 58
59 -def _get_block_coords(parsed_seq, row_dict, has_ner=False):
60 """Returns a list of start, end coordinates for each given block in the sequence.""" 61 start = 0 62 coords = [] 63 if not has_ner: 64 splitter = _RE_EXON 65 else: 66 splitter = _RE_NER 67 68 # use the query line for reference 69 seq = parsed_seq[row_dict['query']] 70 71 for block in re.split(splitter, seq): 72 start += seq[start:].find(block) 73 end = start + len(block) 74 coords.append((start, end)) 75 76 return coords
77 78
79 -def _get_inter_coords(coords, strand=1):
80 """From the given pairs of coordinates, returns a list of pairs 81 covering the intervening ranges. 82 """ 83 # adapted from Python's itertools guide 84 # if strand is -1, adjust coords to the ends and starts are chained 85 if strand == -1: 86 sorted_coords = [(max(a, b), min(a, b)) for a, b in coords] 87 inter_coords = list(chain(*sorted_coords))[1:-1] 88 return list(zip(inter_coords[1::2], inter_coords[::2])) 89 else: 90 inter_coords = list(chain(*coords))[1:-1] 91 return list(zip(inter_coords[::2], inter_coords[1::2]))
92 93
94 -def _stitch_rows(raw_rows):
95 """Stitches together the parsed alignment rows and returns them in a list.""" 96 # deal with possible codon surprise! 97 # (i.e. alignments with codons using cdna2genome model) 98 # by creating additional rows to contain the codons 99 try: 100 max_len = max(len(x) for x in raw_rows) 101 for row in raw_rows: 102 assert len(row) == max_len 103 except AssertionError: 104 for idx, row in enumerate(raw_rows): 105 if len(row) != max_len: 106 # codons must be present in the query and hit (so +2) 107 assert len(row) + 2 == max_len 108 # add additional empty lines to contain codons 109 raw_rows[idx] = [' ' * len(row[0])] + row + [' ' * len(row[0])] 110 111 cmbn_rows = [] 112 for idx, row in enumerate(raw_rows[0]): 113 cmbn_row = ''.join(aln_row[idx] for aln_row in raw_rows) 114 cmbn_rows.append(cmbn_row) 115 116 # the real aligned sequence is always the 'outer' one, so we want 117 # to flip them with their 'inner' pairs 118 if len(cmbn_rows) == 5: 119 # flip query sequence 120 cmbn_rows[0], cmbn_rows[1] = \ 121 _flip_codons(cmbn_rows[0], cmbn_rows[1]) 122 # flip hit sequence 123 cmbn_rows[4], cmbn_rows[3] = \ 124 _flip_codons(cmbn_rows[4], cmbn_rows[3]) 125 126 return cmbn_rows
127 128
129 -def _get_row_dict(row_len, model):
130 """Returns a dictionary of row indices for parsing alignment blocks.""" 131 idx = {} 132 # 3 lines, usually in dna vs dna models 133 if row_len == 3: 134 idx['query'] = 0 135 idx['midline'] = 1 136 idx['hit'] = 2 137 idx['qannot'], idx['hannot'] = None, None 138 # 4 lines, in protein vs dna models or dna vs protein models 139 # TODO: currently we check this from the model string; is there 140 # a better way to do it? 141 elif row_len == 4: 142 if 'protein2' in model: 143 idx['query'] = 0 144 idx['midline'] = 1 145 idx['hit'] = 2 146 idx['hannot'] = 3 147 idx['qannot'] = None 148 elif '2protein' in model: 149 idx['query'] = 1 150 idx['midline'] = 2 151 idx['hit'] = 3 152 idx['hannot'] = None 153 idx['qannot'] = 0 154 else: 155 raise ValueError("Unexpected model: " + model) 156 # 5 lines, translated dna vs translated dna 157 elif row_len == 5: 158 # set sequence indexes 159 idx['qannot'] = 0 160 idx['query'] = 1 161 idx['midline'] = 2 162 idx['hit'] = 3 163 idx['hannot'] = 4 164 else: 165 raise ValueError("Unexpected row count in alignment block: " 166 "%i" % row_len) 167 return idx
168 169
170 -def _get_blocks(rows, coords, idx):
171 """Returns a list of dictionaries of sequences split by the coordinates.""" 172 for idx_name in ('query', 'hit', 'midline', 'qannot', 'hannot'): 173 assert idx_name in idx 174 blocks = [] 175 for start, end in coords: 176 block = {} 177 # get seqs according to index 178 block['query'] = rows[idx['query']][start:end] 179 block['hit'] = rows[idx['hit']][start:end] 180 block['similarity'] = rows[idx['midline']][start:end] 181 if idx['qannot'] is not None: 182 block['query_annotation'] = rows[idx['qannot']][start:end] 183 if idx['hannot'] is not None: 184 block['hit_annotation'] = rows[idx['hannot']][start:end] 185 blocks.append(block) 186 187 return blocks
188 189
190 -def _get_scodon_moves(tmp_seq_blocks):
191 """Returns a dictionary of split codon locations relative to each 192 fragment's end. 193 """ 194 scodon_moves = {'query': [], 'hit': []} 195 for seq_type in scodon_moves: 196 scoords = [] 197 for block in tmp_seq_blocks: 198 # check both ends of the sequence for residues in curly braces 199 m_start = re.search(_RE_SCODON_START, block[seq_type]) 200 m_end = re.search(_RE_SCODON_END, block[seq_type]) 201 if m_start: 202 m_start = len(m_start.group(1)) 203 scoords.append((m_start, 0)) 204 else: 205 scoords.append((0, 0)) 206 if m_end: 207 m_end = len(m_end.group(1)) 208 scoords.append((0, m_end)) 209 else: 210 scoords.append((0, 0)) 211 scodon_moves[seq_type] = scoords 212 213 return scodon_moves
214 215
216 -def _clean_blocks(tmp_seq_blocks):
217 """Removes curly braces (split codon markers) from the given sequences.""" 218 seq_blocks = [] 219 for seq_block in tmp_seq_blocks: 220 for line_name in seq_block: 221 seq_block[line_name] = \ 222 seq_block[line_name].replace('{', '').replace('}', '') 223 seq_blocks.append(seq_block) 224 225 return seq_blocks
226 227
228 -def _comp_intron_lens(seq_type, inter_blocks, raw_inter_lens):
229 """Returns the length of introns between fragments.""" 230 # set opposite type, for setting introns 231 opp_type = 'hit' if seq_type == 'query' else 'query' 232 # list of flags to denote if an intron follows a block 233 # it reads e.g. this line: 234 # "ATGTT{TT} >>>> Target Intron 1 >>>> {G}TGTGTGTACATT" 235 # and sets the opposing sequence type's intron (since this 236 # line is present on the opposite sequence type line) 237 has_intron_after = ['Intron' in x[seq_type] for x in 238 inter_blocks] 239 assert len(has_intron_after) == len(raw_inter_lens) 240 # create list containing coord adjustments incorporating 241 # intron lengths 242 inter_lens = [] 243 for flag, parsed_len in zip(has_intron_after, raw_inter_lens): 244 if flag: 245 # joint introns 246 if all(parsed_len[:2]): 247 # intron len is [0] if opp_type is query, otherwise it's [1] 248 intron_len = int(parsed_len[0]) if opp_type == 'query' \ 249 else int(parsed_len[1]) 250 # single hit/query introns 251 elif parsed_len[2]: 252 intron_len = int(parsed_len[2]) 253 else: 254 raise ValueError("Unexpected intron parsing " 255 "result: %r" % parsed_len) 256 else: 257 intron_len = 0 258 259 inter_lens.append(intron_len) 260 261 return inter_lens
262 263
264 -def _comp_coords(hsp, seq_type, inter_lens):
265 """Fill the block coordinates of the given hsp dictionary.""" 266 assert seq_type in ('hit', 'query') 267 # manually fill the first coord 268 seq_step = 1 if hsp['%s_strand' % seq_type] >= 0 else -1 269 fstart = hsp['%s_start' % seq_type] 270 # fend is fstart + number of residues in the sequence, minus gaps 271 fend = fstart + len( 272 hsp[seq_type][0].replace('-', '').replace('>', 273 '').replace('<', '')) * seq_step 274 coords = [(fstart, fend)] 275 # and start from the second block, after the first inter seq 276 for idx, block in enumerate(hsp[seq_type][1:]): 277 bstart = coords[-1][1] + inter_lens[idx] * seq_step 278 bend = bstart + seq_step * \ 279 len(block.replace('-', '')) 280 coords.append((bstart, bend)) 281 282 # adjust the coords so the smallest is [0], if strand is -1 283 # couldn't do this in the previous steps since we need the initial 284 # block ordering 285 if seq_step != 1: 286 for idx, coord in enumerate(coords): 287 coords[idx] = coords[idx][1], coords[idx][0] 288 289 return coords
290 291
292 -def _comp_split_codons(hsp, seq_type, scodon_moves):
293 """Computes the positions of split codons and puts the values in the given 294 HSP dictionary. 295 """ 296 scodons = [] 297 for idx in range(len(scodon_moves[seq_type])): 298 pair = scodon_moves[seq_type][idx] 299 if not any(pair): 300 continue 301 else: 302 assert not all(pair) 303 a, b = pair 304 anchor_pair = hsp['%s_ranges' % seq_type][idx // 2] 305 strand = 1 if hsp['%s_strand' % seq_type] >= 0 else -1 306 307 if a: 308 func = max if strand == 1 else min 309 anchor = func(anchor_pair) 310 start_c, end_c = anchor + a * strand * -1, anchor 311 elif b: 312 func = min if strand == 1 else max 313 anchor = func(anchor_pair) 314 start_c, end_c = anchor + b * strand, anchor 315 scodons.append((min(start_c, end_c), max(start_c, end_c))) 316 317 return scodons
318 319
320 -class ExonerateTextParser(_BaseExonerateParser):
321 """Parser for Exonerate plain text output.""" 322 323 _ALN_MARK = 'C4 Alignment:' 324
325 - def parse_alignment_block(self, header):
326 qresult = header['qresult'] 327 hit = header['hit'] 328 hsp = header['hsp'] 329 # check for values that must have been set by previous methods 330 for val_name in ('query_start', 'query_end', 'hit_start', 'hit_end', 331 'query_strand', 'hit_strand'): 332 assert val_name in hsp, hsp 333 334 # get the alignment rows 335 # and stitch them so we have the full sequences in single strings 336 raw_aln_blocks, vulgar_comp = self._read_alignment() 337 # cmbn_rows still has split codon markers (curly braces) 338 cmbn_rows = _stitch_rows(raw_aln_blocks) 339 row_dict = _get_row_dict(len(cmbn_rows), qresult['model']) 340 # get the sequence blocks 341 has_ner = 'NER' in qresult['model'].upper() 342 seq_coords = _get_block_coords(cmbn_rows, row_dict, has_ner) 343 tmp_seq_blocks = _get_blocks(cmbn_rows, seq_coords, row_dict) 344 # get split codon temp coords for later use 345 # this result in pairs of base movement for both ends of each row 346 scodon_moves = _get_scodon_moves(tmp_seq_blocks) 347 # remove the split codon markers 348 seq_blocks = _clean_blocks(tmp_seq_blocks) 349 350 # adjust strands 351 hsp['query_strand'] = _STRAND_MAP[hsp['query_strand']] 352 hsp['hit_strand'] = _STRAND_MAP[hsp['hit_strand']] 353 # cast coords into ints 354 hsp['query_start'] = int(hsp['query_start']) 355 hsp['query_end'] = int(hsp['query_end']) 356 hsp['hit_start'] = int(hsp['hit_start']) 357 hsp['hit_end'] = int(hsp['hit_end']) 358 # cast score into ints 359 hsp['score'] = int(hsp['score']) 360 # set sequences 361 hsp['query'] = [x['query'] for x in seq_blocks] 362 hsp['hit'] = [x['hit'] for x in seq_blocks] 363 hsp['aln_annotation'] = {} 364 # set the alphabet 365 # currently only limited to models with protein queries 366 if 'protein2' in qresult['model'] or 'coding2' in qresult['model'] \ 367 or '2protein' in qresult['model']: 368 hsp['alphabet'] = generic_protein 369 # get the annotations if they exist 370 for annot_type in ('similarity', 'query_annotation', 'hit_annotation'): 371 try: 372 hsp['aln_annotation'][annot_type] = \ 373 [x[annot_type] for x in seq_blocks] 374 except KeyError: 375 pass 376 377 # use vulgar coordinates if vulgar line is present and return 378 # if vulgar_comp is not None: 379 # hsp = parse_vulgar_comp(hsp, vulgar_comp) 380 381 # return {'qresult': qresult, 'hit': hit, 'hsp': hsp} 382 383 # otherwise we need to get the coordinates from the alignment 384 # get the intervening blocks first, so we can use them 385 # to adjust the coordinates 386 if not has_ner: 387 # get intervening coordinates and blocks, only if model is not ner 388 # ner models have a much more simple coordinate calculation 389 inter_coords = _get_inter_coords(seq_coords) 390 inter_blocks = _get_blocks(cmbn_rows, inter_coords, row_dict) 391 # returns a three-component tuple of intron lengths 392 # first two component filled == intron in hit and query 393 # last component filled == intron in hit or query 394 raw_inter_lens = re.findall(_RE_EXON_LEN, 395 cmbn_rows[row_dict['midline']]) 396 397 # compute start and end coords for each block 398 for seq_type in ('query', 'hit'): 399 400 # ner blocks and intron blocks require different adjustments 401 if not has_ner: 402 opp_type = 'hit' if seq_type == 'query' else 'query' 403 inter_lens = _comp_intron_lens(seq_type, inter_blocks, 404 raw_inter_lens) 405 else: 406 # for NER blocks, the length of the inter-fragment gaps is 407 # written on the same strand, so opp_type is seq_type 408 opp_type = seq_type 409 inter_lens = [int(x) for x in 410 re.findall(_RE_NER_LEN, cmbn_rows[row_dict[seq_type]])] 411 412 # check that inter_lens's length is len opp_type block - 1 413 assert len(inter_lens) == len(hsp[opp_type]) - 1, \ 414 "%r vs %r" % (len(inter_lens), len(hsp[opp_type]) - 1) 415 # fill the hsp query and hit coordinates 416 hsp['%s_ranges' % opp_type] = \ 417 _comp_coords(hsp, opp_type, inter_lens) 418 # and fill the split codon coordinates, if model != ner 419 # can't do this in the if-else clause above since we need to 420 # compute the ranges first 421 if not has_ner: 422 hsp['%s_split_codons' % opp_type] = \ 423 _comp_split_codons(hsp, opp_type, scodon_moves) 424 425 # now that we've finished parsing coords, we can set the hit and start 426 # coord according to Biopython's convention (start <= end) 427 for seq_type in ('query', 'hit'): 428 if hsp['%s_strand' % seq_type] == -1: 429 n_start = '%s_start' % seq_type 430 n_end = '%s_end' % seq_type 431 hsp[n_start], hsp[n_end] = hsp[n_end], hsp[n_start] 432 433 return {'qresult': qresult, 'hit': hit, 'hsp': hsp}
434
435 - def _read_alignment(self):
436 """Reads the raw alignment block strings, returns them in a list.""" 437 raw_aln_blocks = [] 438 # flag to check whether we're in an aligment row 439 in_aln_row = False 440 # flag for vulgar line, if present, we can parse coordinates from it 441 vulgar_comp = None 442 while True: 443 444 match = re.search(_RE_ALN_ROW, self.line.strip()) 445 # if we have a match, set flags and values 446 if match and not in_aln_row: 447 start_idx = self.line.index(match.group(1)) 448 row_len = len(match.group(1)) 449 in_aln_row = True 450 raw_aln_block = [] 451 # if we're in an alignment row, grab the sequence 452 if in_aln_row: 453 raw_aln_block.append(self.line[start_idx:start_idx + row_len]) 454 # reset flags and values if the line matches, we're in an alignment 455 # row, and there are more than 1 line in rows 456 if match and in_aln_row and len(raw_aln_block) > 1: 457 raw_aln_blocks.append(raw_aln_block) 458 start_idx = None 459 row_len = None 460 in_aln_row = False 461 462 self.line = self.handle.readline() 463 # try to parse vulgar line if present 464 if self.line.startswith('vulgar'): 465 vulgar = re.search(_RE_VULGAR, self.line) 466 vulgar_comp = vulgar.group(10) 467 if not self.line or self.line.startswith(self._ALN_MARK): 468 # HACK: this is so that the parse_qresult method does not 469 # yield the objects before appending the last HSP. We are doing 470 # this to keep the parser compatible with outputs without 471 # human-readable alignment outputs. This also relies on the 472 # fact that repeated readline() always returns '' on EOF. 473 if not self.line: 474 self.line = 'mock' 475 break 476 477 return raw_aln_blocks, vulgar_comp
478 479
480 -class ExonerateTextIndexer(_BaseExonerateIndexer):
481 """Indexer class for Exonerate plain text.""" 482 483 _parser = ExonerateTextParser 484 _query_mark = b"C4 Alignment" 485
486 - def get_qresult_id(self, pos):
487 """Returns the query ID from the nearest "Query:" line.""" 488 handle = self._handle 489 handle.seek(pos) 490 sentinel = b"Query:" 491 492 while True: 493 line = handle.readline().strip() 494 if line.startswith(sentinel): 495 break 496 if not line: 497 raise StopIteration 498 qid, desc = _parse_hit_or_query_line(_bytes_to_string(line)) 499 500 return qid
501
502 - def get_raw(self, offset):
503 """Returns the raw string of a QueryResult object from the given offset.""" 504 handle = self._handle 505 handle.seek(offset) 506 qresult_key = None 507 qresult_raw = b"" 508 509 while True: 510 line = handle.readline() 511 if not line: 512 break 513 elif line.startswith(self._query_mark): 514 cur_pos = handle.tell() 515 if qresult_key is None: 516 qresult_key = self.get_qresult_id(cur_pos) 517 else: 518 curr_key = self.get_qresult_id(cur_pos) 519 if curr_key != qresult_key: 520 break 521 handle.seek(cur_pos) 522 qresult_raw += line 523 524 return qresult_raw
525 526 527 # if not used as a module, run the doctest 528 if __name__ == "__main__": 529 from Bio._utils import run_doctest 530 run_doctest() 531