Package Bio :: Package SearchIO :: Package ExonerateIO :: Module exonerate_text
[hide private]
[frames] | no frames]

Source Code for Module Bio.SearchIO.ExonerateIO.exonerate_text

  1  # Copyright 2012 by Wibowo Arindrarto.  All rights reserved. 
  2  # This code is part of the Biopython distribution and governed by its 
  3  # license.  Please see the LICENSE file that should have been included 
  4  # as part of this package. 
  5   
  6  """Bio.SearchIO parser for Exonerate plain text output format.""" 
  7   
  8  import re 
  9  from itertools import chain 
 10   
 11  from Bio._py3k import _as_bytes, _bytes_to_string 
 12  from Bio.Alphabet import generic_protein 
 13   
 14  from _base import _BaseExonerateParser, _BaseExonerateIndexer, _STRAND_MAP, \ 
 15          _parse_hit_or_query_line 
 16  from exonerate_vulgar import parse_vulgar_comp, _RE_VULGAR 
 17   
 18   
 19  __all__ = ['ExonerateTextParser', 'ExonerateTextIndexer'] 
 20   
 21   
 22  # for capturing sequences in alignment blocks 
 23  # e.g. ' 529 : ATCCCTTATCTCTTTATCTTGTA :    472' 
 24  _RE_ALN_ROW = re.compile(r'\s*\d+\s+: (.*) :\s+\d+') 
 25  # for splitting the line based on intron annotations 
 26  # e.g. '  >>>> Target Intron 1 >>>>  ' or 'gt.........................ag' 
 27  _RE_EXON = re.compile(r'[atgc ]{2,}?(?:(?:[<>]+ \w+ Intron \d+ [<>]+)|(?:\.+))[atgc ]{2,}?') 
 28  # captures the intron length 
 29  # from e.g. '61 bp // 154295 bp' (joint intron lengths) or '177446 bp' 
 30  _RE_EXON_LEN = re.compile(r'(?:(\d+) bp // (\d+) bp)|(?:(\d+) bp)') 
 31  # for splitting lines in the NER model 
 32  _RE_NER = re.compile(r'--<\s+\d+\s+>--') 
 33  # for capturing NER gap lengths 
 34  _RE_NER_LEN = re.compile(r'--<\s+(\d+)\s+>--') 
 35  # regexes for capturing the letters inside curly braces 
 36  # no. of letters is either 1 or 2, since they are split codons 
 37  _RE_SCODON_START = re.compile(r'\{(\w{1,2})\}$') 
 38  _RE_SCODON_END = re.compile(r'^\{(\w{1,2})\}') 
 39   
 40   
41 -def _flip_codons(codon_seq, target_seq):
42 """Flips the codon characters from one seq to another.""" 43 a, b = '', '' 44 for char1, char2 in zip(codon_seq, target_seq): 45 # no need to do anything if the codon seq line has nothing 46 if char1 == ' ': 47 a += char1 48 b += char2 49 else: 50 a += char2 51 b += char1 52 53 return a, b
54 55
56 -def _get_block_coords(parsed_seq, row_dict, has_ner=False):
57 """Returns a list of start, end coordinates for each given block in the sequence.""" 58 start = 0 59 coords = [] 60 if not has_ner: 61 splitter = _RE_EXON 62 else: 63 splitter = _RE_NER 64 65 # use the query line for reference 66 seq = parsed_seq[row_dict['query']] 67 68 for block in re.split(splitter, seq): 69 start += seq[start:].find(block) 70 end = start + len(block) 71 coords.append((start, end)) 72 73 return coords
74 75
76 -def _get_inter_coords(coords, strand=1):
77 """From the given pairs of coordinates, returns a list of pairs 78 covering the intervening ranges.""" 79 # adapted from Python's itertools guide 80 # if strand is -1, adjust coords to the ends and starts are chained 81 if strand == -1: 82 sorted_coords = [(max(a, b), min(a, b)) for a, b in coords] 83 inter_coords = list(chain(*sorted_coords))[1:-1] 84 return zip(inter_coords[1::2], inter_coords[::2]) 85 else: 86 inter_coords = list(chain(*coords))[1:-1] 87 return zip(inter_coords[::2], inter_coords[1::2])
88 89
90 -def _stitch_rows(raw_rows):
91 """Stitches together the parsed alignment rows and returns them in a list.""" 92 # deal with possible codon surprise! 93 # (i.e. alignments with codons using cdna2genome model) 94 # by creating additional rows to contain the codons 95 try: 96 max_len = max([len(x) for x in raw_rows]) 97 for row in raw_rows: 98 assert len(row) == max_len 99 except AssertionError: 100 for idx, row in enumerate(raw_rows): 101 if len(row) != max_len: 102 # codons must be present in the query and hit (so +2) 103 assert len(row) + 2 == max_len 104 # add additional empty lines to contain codons 105 raw_rows[idx] = [' ' * len(row[0])] + row + [' ' * len(row[0])] 106 107 cmbn_rows = [] 108 for idx, row in enumerate(raw_rows[0]): 109 cmbn_row = ''.join(aln_row[idx] for aln_row in raw_rows) 110 cmbn_rows.append(cmbn_row) 111 112 # the real aligned sequence is always the 'outer' one, so we want 113 # to flip them with their 'inner' pairs 114 if len(cmbn_rows) == 5: 115 # flip query sequence 116 cmbn_rows[0], cmbn_rows[1] = \ 117 _flip_codons(cmbn_rows[0], cmbn_rows[1]) 118 # flip hit sequence 119 cmbn_rows[4], cmbn_rows[3] = \ 120 _flip_codons(cmbn_rows[4], cmbn_rows[3]) 121 122 return cmbn_rows
123 124
125 -def _get_row_dict(row_len):
126 """Returns a dictionary of row indices for parsing alignment blocks.""" 127 idx = {} 128 # 3 lines, usually in dna vs dna models 129 if row_len == 3: 130 idx['query'] = 0 131 idx['midline'] = 1 132 idx['hit'] = 2 133 idx['qannot'], idx['hannot'] = None, None 134 # 4 lines, in protein vs dna models 135 elif row_len == 4: 136 idx['query'] = 0 137 idx['midline'] = 1 138 idx['hit'] = 2 139 idx['hannot'] = 3 140 idx['qannot'] = None 141 # 5 lines, translated dna vs translated dna 142 elif row_len == 5: 143 # set sequence indexes 144 idx['qannot'] = 0 145 idx['query'] = 1 146 idx['midline'] = 2 147 idx['hit'] = 3 148 idx['hannot'] = 4 149 else: 150 raise ValueError("Unexpected row count in alignment block: " 151 "%i" % row_len) 152 return idx
153 154
155 -def _get_blocks(rows, coords, idx):
156 """Returns a list of dictionaries of sequences split by the coordinates.""" 157 for idx_name in ('query', 'hit', 'midline', 'qannot', 'hannot'): 158 assert idx_name in idx 159 blocks = [] 160 for start, end in coords: 161 block = {} 162 # get seqs according to index 163 block['query'] = rows[idx['query']][start:end] 164 block['hit'] = rows[idx['hit']][start:end] 165 block['homology'] = rows[idx['midline']][start:end] 166 if idx['qannot'] is not None: 167 block['query_annotation'] = rows[idx['qannot']][start:end] 168 if idx['hannot'] is not None: 169 block['hit_annotation'] = rows[idx['hannot']][start:end] 170 blocks.append(block) 171 172 return blocks
173 174
175 -def _get_scodon_moves(tmp_seq_blocks):
176 """Returns a dictionary of split codon locations relative to each 177 fragment's end""" 178 scodon_moves = {'query': [], 'hit': []} 179 for seq_type in scodon_moves: 180 scoords = [] 181 for block in tmp_seq_blocks: 182 # check both ends of the sequence for residues in curly braces 183 m_start = re.search(_RE_SCODON_START, block[seq_type]) 184 m_end = re.search(_RE_SCODON_END, block[seq_type]) 185 if m_start: 186 m_start = len(m_start.group(1)) 187 scoords.append((m_start, 0)) 188 else: 189 scoords.append((0, 0)) 190 if m_end: 191 m_end = len(m_end.group(1)) 192 scoords.append((0, m_end)) 193 else: 194 scoords.append((0, 0)) 195 scodon_moves[seq_type] = scoords 196 197 return scodon_moves
198 199
200 -def _clean_blocks(tmp_seq_blocks):
201 """Removes curly braces (split codon markers) from the given sequences.""" 202 seq_blocks = [] 203 for seq_block in tmp_seq_blocks: 204 for line_name in seq_block: 205 seq_block[line_name] = \ 206 seq_block[line_name].replace('{', '').replace('}', '') 207 seq_blocks.append(seq_block) 208 209 return seq_blocks
210 211
212 -def _comp_intron_lens(seq_type, inter_blocks, raw_inter_lens):
213 """Returns the length of introns between fragments.""" 214 # set opposite type, for setting introns 215 opp_type = 'hit' if seq_type == 'query' else 'query' 216 # list of flags to denote if an intron follows a block 217 # it reads e.g. this line: 218 # "ATGTT{TT} >>>> Target Intron 1 >>>> {G}TGTGTGTACATT" 219 # and sets the opposing sequence type's intron (since this 220 # line is present on the opposite sequence type line) 221 has_intron_after = ['Intron' in x[seq_type] for x in 222 inter_blocks] 223 assert len(has_intron_after) == len(raw_inter_lens) 224 # create list containing coord adjustments incorporating 225 # intron lengths 226 inter_lens = [] 227 for flag, parsed_len in zip(has_intron_after, raw_inter_lens): 228 if flag: 229 # joint introns 230 if all(parsed_len[:2]): 231 # intron len is [0] if opp_type is query, otherwise it's [1] 232 intron_len = int(parsed_len[0]) if opp_type == 'query' \ 233 else int(parsed_len[1]) 234 # single hit/query introns 235 elif parsed_len[2]: 236 intron_len = int(parsed_len[2]) 237 else: 238 raise ValueError("Unexpected intron parsing " 239 "result: %r" % parsed_len) 240 else: 241 intron_len = 0 242 243 inter_lens.append(intron_len) 244 245 return inter_lens
246 247
248 -def _comp_coords(hsp, seq_type, inter_lens):
249 """Fill the block coordinates of the given hsp dictionary.""" 250 assert seq_type in ('hit', 'query') 251 # manually fill the first coord 252 seq_step = 1 if hsp['%s_strand' % seq_type] >= 0 else -1 253 fstart = hsp['%s_start' % seq_type] 254 # fend is fstart + number of residues in the sequence, minus gaps 255 fend = fstart + len( 256 hsp[seq_type][0].replace('-','').replace('>', 257 '').replace('<', '')) * seq_step 258 coords = [(fstart, fend)] 259 # and start from the second block, after the first inter seq 260 for idx, block in enumerate(hsp[seq_type][1:]): 261 bstart = coords[-1][1] + inter_lens[idx] * seq_step 262 bend = bstart + seq_step * \ 263 len(block.replace('-', '')) 264 coords.append((bstart, bend)) 265 266 # adjust the coords so the smallest is [0], if strand is -1 267 # couldn't do this in the previous steps since we need the initial 268 # block ordering 269 if seq_step != 1: 270 for idx, coord in enumerate(coords): 271 coords[idx] = coords[idx][1], coords[idx][0] 272 273 return coords
274 275
276 -def _comp_split_codons(hsp, seq_type, scodon_moves):
277 """Computes the positions of split codons and puts the values in the given 278 HSP dictionary.""" 279 scodons = [] 280 for idx in range(len(scodon_moves[seq_type])): 281 pair = scodon_moves[seq_type][idx] 282 if not any(pair): 283 continue 284 else: 285 assert not all(pair) 286 a, b = pair 287 anchor_pair = hsp['%s_ranges' % seq_type][idx // 2] 288 strand = 1 if hsp['%s_strand' % seq_type] >= 0 else -1 289 290 if a: 291 func = max if strand == 1 else min 292 anchor = func(anchor_pair) 293 start_c, end_c = anchor + a * strand * -1, anchor 294 elif b: 295 func = min if strand == 1 else max 296 anchor = func(anchor_pair) 297 start_c, end_c = anchor + b * strand, anchor 298 scodons.append((min(start_c, end_c), max(start_c, end_c))) 299 300 return scodons
301 302
303 -class ExonerateTextParser(_BaseExonerateParser):
304 305 """Parser for Exonerate plain text output.""" 306 307 _ALN_MARK = 'C4 Alignment:' 308
309 - def parse_alignment_block(self, header):
310 qresult = header['qresult'] 311 hit = header['hit'] 312 hsp = header['hsp'] 313 # check for values that must have been set by previous methods 314 for val_name in ('query_start', 'query_end' ,'hit_start', 'hit_end', 315 'query_strand', 'hit_strand'): 316 assert val_name in hsp, hsp 317 318 # get the alignment rows 319 # and stitch them so we have the full sequences in single strings 320 raw_aln_blocks, vulgar_comp = self._read_alignment() 321 # cmbn_rows still has split codon markers (curly braces) 322 cmbn_rows = _stitch_rows(raw_aln_blocks) 323 row_dict = _get_row_dict(len(cmbn_rows)) 324 # get the sequence blocks 325 has_ner = 'NER' in qresult['model'].upper() 326 seq_coords = _get_block_coords(cmbn_rows, row_dict, has_ner) 327 tmp_seq_blocks = _get_blocks(cmbn_rows, seq_coords, row_dict) 328 # get split codon temp coords for later use 329 # this result in pairs of base movement for both ends of each row 330 scodon_moves = _get_scodon_moves(tmp_seq_blocks) 331 # remove the split codon markers 332 seq_blocks = _clean_blocks(tmp_seq_blocks) 333 334 # adjust strands 335 hsp['query_strand'] = _STRAND_MAP[hsp['query_strand']] 336 hsp['hit_strand'] = _STRAND_MAP[hsp['hit_strand']] 337 # cast coords into ints 338 hsp['query_start'] = int(hsp['query_start']) 339 hsp['query_end'] = int(hsp['query_end']) 340 hsp['hit_start'] = int(hsp['hit_start']) 341 hsp['hit_end'] = int(hsp['hit_end']) 342 # cast score into ints 343 hsp['score'] = int(hsp['score']) 344 # set sequences 345 hsp['query'] = [x['query'] for x in seq_blocks] 346 hsp['hit'] = [x['hit'] for x in seq_blocks] 347 hsp['aln_annotation'] = {} 348 # set the alphabet 349 # currently only limited to models with protein queries 350 if 'protein2' in qresult['model'] or 'coding2' in qresult['model']: 351 hsp['alphabet'] = generic_protein 352 # get the annotations if they exist 353 for annot_type in ('homology', 'query_annotation', 'hit_annotation'): 354 try: 355 hsp['aln_annotation'][annot_type] = \ 356 [x[annot_type] for x in seq_blocks] 357 except KeyError: 358 pass 359 360 # use vulgar coordinates if vulgar line is present and return 361 #if vulgar_comp is not None: 362 # hsp = parse_vulgar_comp(hsp, vulgar_comp) 363 364 # return {'qresult': qresult, 'hit': hit, 'hsp': hsp} 365 366 # otherwise we need to get the coordinates from the alignment 367 # get the intervening blocks first, so we can use them 368 # to adjust the coordinates 369 if not has_ner: 370 # get intervening coordinates and blocks, only if model is not ner 371 # ner models have a much more simple coordinate calculation 372 inter_coords = _get_inter_coords(seq_coords) 373 inter_blocks = _get_blocks(cmbn_rows, inter_coords, row_dict) 374 # returns a three-component tuple of intron lengths 375 # first two component filled == intron in hit and query 376 # last component filled == intron in hit or query 377 raw_inter_lens = re.findall(_RE_EXON_LEN, 378 cmbn_rows[row_dict['midline']]) 379 380 # compute start and end coords for each block 381 for seq_type in ('query', 'hit'): 382 383 # ner blocks and intron blocks require different adjustments 384 if not has_ner: 385 opp_type = 'hit' if seq_type == 'query' else 'query' 386 inter_lens = _comp_intron_lens(seq_type, inter_blocks, 387 raw_inter_lens) 388 else: 389 # for NER blocks, the length of the inter-fragment gaps is 390 # written on the same strand, so opp_type is seq_type 391 opp_type = seq_type 392 inter_lens = [int(x) for x in 393 re.findall(_RE_NER_LEN, cmbn_rows[row_dict[seq_type]])] 394 395 # check that inter_lens's length is len opp_type block - 1 396 assert len(inter_lens) == len(hsp[opp_type])-1, \ 397 "%r vs %r" % (len(inter_lens), len(hsp[opp_type])-1) 398 # fill the hsp query and hit coordinates 399 hsp['%s_ranges' % opp_type] = \ 400 _comp_coords(hsp, opp_type, inter_lens) 401 # and fill the split codon coordinates, if model != ner 402 # can't do this in the if-else clause above since we need to 403 # compute the ranges first 404 if not has_ner: 405 hsp['%s_split_codons' % opp_type] = \ 406 _comp_split_codons(hsp, opp_type, scodon_moves) 407 408 # now that we've finished parsing coords, we can set the hit and start 409 # coord according to Biopython's convention (start <= end) 410 for seq_type in ('query', 'hit'): 411 if hsp['%s_strand' % seq_type] == -1: 412 n_start = '%s_start' % seq_type 413 n_end = '%s_end' % seq_type 414 hsp[n_start], hsp[n_end] = hsp[n_end], hsp[n_start] 415 416 return {'qresult': qresult, 'hit': hit, 'hsp': hsp}
417
418 - def _read_alignment(self):
419 """Reads the raw alignment block strings, returns them in a list.""" 420 raw_aln_blocks = [] 421 # flag to check whether we're in an aligment row 422 in_aln_row = False 423 # flag for vulgar line, if present, we can parse coordinates from it 424 vulgar_comp = None 425 while True: 426 427 match = re.search(_RE_ALN_ROW, self.line.strip()) 428 # if we have a match, set flags and values 429 if match and not in_aln_row: 430 start_idx = self.line.index(match.group(1)) 431 row_len = len(match.group(1)) 432 in_aln_row = True 433 raw_aln_block = [] 434 # if we're in an alignment row, grab the sequence 435 if in_aln_row: 436 raw_aln_block.append(self.line[start_idx:start_idx+row_len]) 437 # reset flags and values if the line matches, we're in an alignment 438 # row, and there are more than 1 line in rows 439 if match and in_aln_row and len(raw_aln_block) > 1: 440 raw_aln_blocks.append(raw_aln_block) 441 start_idx = None 442 row_len = None 443 in_aln_row = False 444 445 self.line = self.handle.readline() 446 # try to parse vulgar line if present 447 if self.line.startswith('vulgar'): 448 vulgar = re.search(_RE_VULGAR, self.line) 449 vulgar_comp = vulgar.group(10) 450 if not self.line or self.line.startswith(self._ALN_MARK): 451 # HACK: this is so that the parse_qresult method does not 452 # yield the objects before appending the last HSP. We are doing 453 # this to keep the parser compatible with outputs without 454 # human-readable alignment outputs. This also relies on the 455 # fact that repeated readline() always returns '' on EOF. 456 if not self.line: 457 self.line = 'mock' 458 break 459 460 return raw_aln_blocks, vulgar_comp
461 462
463 -class ExonerateTextIndexer(_BaseExonerateIndexer):
464 465 """Indexer class for Exonerate plain text.""" 466 467 _parser = ExonerateTextParser 468 _query_mark = _as_bytes('C4 Alignment') 469
470 - def get_qresult_id(self, pos):
471 """Returns the query ID from the nearest "Query:" line.""" 472 handle = self._handle 473 handle.seek(pos) 474 sentinel = _as_bytes('Query:') 475 476 while True: 477 line = handle.readline().strip() 478 if line.startswith(sentinel): 479 break 480 if not line: 481 raise StopIteration 482 qid, desc = _parse_hit_or_query_line(_bytes_to_string(line)) 483 484 return qid
485
486 - def get_raw(self, offset):
487 """Returns the raw string of a QueryResult object from the given offset.""" 488 handle = self._handle 489 handle.seek(offset) 490 qresult_key = None 491 qresult_raw = _as_bytes('') 492 493 while True: 494 line = handle.readline() 495 if not line: 496 break 497 elif line.startswith(self._query_mark): 498 cur_pos = handle.tell() 499 if qresult_key is None: 500 qresult_key = self.get_qresult_id(cur_pos) 501 else: 502 curr_key = self.get_qresult_id(cur_pos) 503 if curr_key != qresult_key: 504 break 505 handle.seek(cur_pos) 506 qresult_raw += line 507 508 return qresult_raw
509 510 511 # if not used as a module, run the doctest 512 if __name__ == "__main__": 513 from Bio._utils import run_doctest 514 run_doctest() 515