Package Bio :: Package SearchIO :: Package HmmerIO :: Module hmmer3_text
[hide private]
[frames] | no frames]

Source Code for Module Bio.SearchIO.HmmerIO.hmmer3_text

  1  # Copyright 2012 by Wibowo Arindrarto.  All rights reserved. 
  2  # This code is part of the Biopython distribution and governed by its 
  3  # license.  Please see the LICENSE file that should have been included 
  4  # as part of this package. 
  5   
  6  """Bio.SearchIO parser for HMMER plain text output format.""" 
  7   
  8  import re 
  9   
 10  from Bio._py3k import _as_bytes, _bytes_to_string 
 11  from Bio._utils import read_forward 
 12  from Bio.Alphabet import generic_protein 
 13  from Bio.SearchIO._model import QueryResult, Hit, HSP, HSPFragment 
 14   
 15  from ._base import _BaseHmmerTextIndexer 
 16   
 17  __all__ = ('Hmmer3TextParser', 'Hmmer3TextIndexer') 
 18   
 19   
 20  # precompile regex patterns for faster processing 
 21  # regex for program name capture 
 22  _RE_PROGRAM = re.compile(r'^# (\w*hmm\w+) :: .*$') 
 23  # regex for version string capture 
 24  _RE_VERSION = re.compile(r'# \w+ ([\w+\.]+) .*; http.*$') 
 25  # regex for option string capture 
 26  _RE_OPT = re.compile(r'^# (.+):\s+(.+)$') 
 27  # regex for parsing query id and length, for parsing 
 28  _QRE_ID_LEN_PTN = r'^Query:\s*(.*)\s+\[\w=(\d+)\]' 
 29  _QRE_ID_LEN = re.compile(_QRE_ID_LEN_PTN) 
 30  # regex for hsp validation 
 31  _HRE_VALIDATE = re.compile(r'score:\s(-?\d+\.?\d+)\sbits.*value:\s(.*)') 
 32  # regexes for parsing hsp alignment blocks 
 33  _HRE_ANNOT_LINE = re.compile(r'^(\s+)(.+)\s(\w+)') 
 34  _HRE_ID_LINE = re.compile(r'^(\s+\S+\s+[0-9-]+ )(.+?)(\s+[0-9-]+)') 
 35   
 36   
37 -class Hmmer3TextParser(object):
38 """Parser for the HMMER 3.0 text output.""" 39
40 - def __init__(self, handle):
41 self.handle = handle 42 self.line = read_forward(self.handle) 43 self._meta = self._parse_preamble()
44
45 - def __iter__(self):
46 for qresult in self._parse_qresult(): 47 yield qresult
48
49 - def _read_until(self, bool_func):
50 """Reads the file handle until the given function returns True.""" 51 while True: 52 if not self.line or bool_func(self.line): 53 return 54 else: 55 self.line = read_forward(self.handle)
56
57 - def _parse_preamble(self):
58 """Parses HMMER preamble (lines beginning with '#').""" 59 meta = {} 60 # bool flag for storing state ~ whether we are parsing the option 61 # lines or not 62 has_opts = False 63 while True: 64 # no pound sign means we've left the preamble 65 if not self.line.startswith('#'): 66 break 67 # dashes could either mean we are entering or leaving the options 68 # section ~ so it's a switch for the has_opts flag 69 elif '- - -' in self.line: 70 if not has_opts: 71 # if flag is false, that means we're entering opts 72 # so switch the flag accordingly 73 has_opts = True 74 else: 75 # if flag is true, that means we've reached the end of opts 76 # so we can break out of the function 77 break 78 elif not has_opts: 79 # try parsing program 80 regx = re.search(_RE_PROGRAM, self.line) 81 if regx: 82 meta['program'] = regx.group(1) 83 # try parsing version 84 regx = re.search(_RE_VERSION, self.line) 85 if regx: 86 meta['version'] = regx.group(1) 87 elif has_opts: 88 regx = re.search(_RE_OPT, self.line) 89 # if target in regx.group(1), then we store the key as target 90 if 'target' in regx.group(1): 91 meta['target'] = regx.group(2).strip() 92 else: 93 meta[regx.group(1)] = regx.group(2) 94 95 self.line = read_forward(self.handle) 96 97 return meta
98
99 - def _parse_qresult(self):
100 """Parses a HMMER3 query block.""" 101 self._read_until(lambda line: line.startswith('Query:')) 102 103 while self.line: 104 105 regx = re.search(_QRE_ID_LEN, self.line) 106 107 while not regx: 108 self.line = read_forward(self.handle) 109 regx = re.search(_QRE_ID_LEN, self.line) 110 111 # get query id and length 112 qid = regx.group(1).strip() 113 # store qresult attributes 114 qresult_attrs = { 115 'seq_len': int(regx.group(2)), 116 'program': self._meta.get('program'), 117 'version': self._meta.get('version'), 118 'target': self._meta.get('target'), 119 } 120 121 # get description and accession, if they exist 122 qdesc = '<unknown description>' # placeholder 123 while not self.line.startswith('Scores for '): 124 self.line = read_forward(self.handle) 125 126 if self.line.startswith('Accession:'): 127 acc = self.line.strip().split(' ', 1)[1] 128 qresult_attrs['accession'] = acc.strip() 129 elif self.line.startswith('Description:'): 130 qdesc = self.line.strip().split(' ', 1)[1].strip() 131 qresult_attrs['description'] = qdesc 132 133 # parse the query hits 134 while self.line and '//' not in self.line: 135 hit_list = self._parse_hit(qid, qdesc) 136 # read through the statistics summary 137 # TODO: parse and store this information? 138 if self.line.startswith('Internal pipeline'): 139 while self.line and '//' not in self.line: 140 self.line = read_forward(self.handle) 141 142 # create qresult, set its attributes and yield 143 # not initializing hit_list directly to handle empty hits 144 # (i.e. need to set its query description manually) 145 qresult = QueryResult(id=qid, hits=hit_list) 146 for attr, value in qresult_attrs.items(): 147 setattr(qresult, attr, value) 148 yield qresult 149 self.line = read_forward(self.handle) 150 151 # Skip line beginning with '# Alignment of', which are output 152 # when running phmmer with the '-A' flag. 153 if self.line.startswith('#'): 154 self.line = self.handle.readline() 155 156 # HMMER >= 3.1 outputs '[ok]' at the end of all results file, 157 # which means we can break the main loop when we see the line 158 if '[ok]' in self.line: 159 break
160
161 - def _parse_hit(self, qid, qdesc):
162 """Parses a HMMER3 hit block, beginning with the hit table.""" 163 # get to the end of the hit table delimiter and read one more line 164 self._read_until(lambda line: 165 line.startswith(' ------- ------ -----')) 166 self.line = read_forward(self.handle) 167 168 # assume every hit is in inclusion threshold until the inclusion 169 # threshold line is encountered 170 is_included = True 171 172 # parse the hit table 173 hit_attr_list = [] 174 while True: 175 if not self.line: 176 return [] 177 elif self.line.startswith(' ------ inclusion'): 178 is_included = False 179 self.line = read_forward(self.handle) 180 # if there are no hits, then there are no hsps 181 # so we forward-read until 'Internal pipeline..' 182 elif self.line.startswith(' [No hits detected that satisfy ' 183 'reporting'): 184 while True: 185 self.line = read_forward(self.handle) 186 if self.line.startswith('Internal pipeline'): 187 assert len(hit_attr_list) == 0 188 return [] 189 elif self.line.startswith('Domain annotation for each '): 190 hit_list = self._create_hits(hit_attr_list, qid, qdesc) 191 return hit_list 192 # entering hit results row 193 # parse the columns into a list 194 row = [x for x in self.line.strip().split(' ') if x] 195 # join the description words if it's >1 word 196 if len(row) > 10: 197 row[9] = ' '.join(row[9:]) 198 # if there's no description, set it to an empty string 199 elif len(row) < 10: 200 row.append('') 201 assert len(row) == 10 202 # create the hit object 203 hit_attrs = { 204 'id': row[8], 205 'query_id': qid, 206 'evalue': float(row[0]), 207 'bitscore': float(row[1]), 208 'bias': float(row[2]), 209 # row[3:6] is not parsed, since the info is available 210 # at the HSP level 211 'domain_exp_num': float(row[6]), 212 'domain_obs_num': int(row[7]), 213 'description': row[9], 214 'is_included': is_included, 215 } 216 hit_attr_list.append(hit_attrs) 217 218 self.line = read_forward(self.handle)
219
220 - def _create_hits(self, hit_attrs, qid, qdesc):
221 """Parses a HMMER3 hsp block, beginning with the hsp table.""" 222 # read through until the beginning of the hsp block 223 self._read_until(lambda line: line.startswith('Internal pipeline') or 224 line.startswith('>>')) 225 226 # start parsing the hsp block 227 hit_list = [] 228 while True: 229 if self.line.startswith('Internal pipeline'): 230 # by this time we should've emptied the hit attr list 231 assert len(hit_attrs) == 0 232 return hit_list 233 assert self.line.startswith('>>') 234 hid, hdesc = self.line[len('>> '):].split(' ', 1) 235 hdesc = hdesc.strip() 236 237 # read through the hsp table header and move one more line 238 self._read_until(lambda line: 239 line.startswith(' --- ------ ----- --------') or 240 line.startswith(' [No individual domains')) 241 self.line = read_forward(self.handle) 242 243 # parse the hsp table for the current hit 244 hsp_list = [] 245 while True: 246 # break out of hsp parsing if there are no hits, it's the last hsp 247 # or it's the start of a new hit 248 if self.line.startswith(' [No targets detected that satisfy') or \ 249 self.line.startswith(' [No individual domains') or \ 250 self.line.startswith('Internal pipeline statistics summary:') or \ 251 self.line.startswith(' Alignments for each domain:') or \ 252 self.line.startswith('>>'): 253 254 hit_attr = hit_attrs.pop(0) 255 hit = Hit(hsp_list) 256 for attr, value in hit_attr.items(): 257 if attr == "description": 258 cur_val = getattr(hit, attr) 259 if cur_val and value and cur_val.startswith(value): 260 continue 261 setattr(hit, attr, value) 262 if not hit: 263 hit.query_description = qdesc 264 hit_list.append(hit) 265 break 266 267 parsed = [x for x in self.line.strip().split(' ') if x] 268 assert len(parsed) == 16 269 # parsed column order: 270 # index, is_included, bitscore, bias, evalue_cond, evalue 271 # hmmfrom, hmmto, query_ends, hit_ends, alifrom, alito, 272 # envfrom, envto, acc_avg 273 frag = HSPFragment(hid, qid) 274 # set query and hit descriptions if they are defined / nonempty string 275 if qdesc: 276 frag.query_description = qdesc 277 if hdesc: 278 frag.hit_description = hdesc 279 # HMMER3 alphabets are always protein alphabets 280 frag.alphabet = generic_protein 281 # depending on whether the program is hmmsearch, hmmscan, or phmmer 282 # {hmm,ali}{from,to} can either be hit_{from,to} or query_{from,to} 283 # for hmmscan, hit is the hmm profile, query is the sequence 284 if self._meta.get('program') == 'hmmscan': 285 # adjust 'from' and 'to' coordinates to 0-based ones 286 frag.hit_start = int(parsed[6]) - 1 287 frag.hit_end = int(parsed[7]) 288 frag.query_start = int(parsed[9]) - 1 289 frag.query_end = int(parsed[10]) 290 elif self._meta.get('program') in ['hmmsearch', 'phmmer']: 291 # adjust 'from' and 'to' coordinates to 0-based ones 292 frag.hit_start = int(parsed[9]) - 1 293 frag.hit_end = int(parsed[10]) 294 frag.query_start = int(parsed[6]) - 1 295 frag.query_end = int(parsed[7]) 296 # strand is always 0, since HMMER now only handles protein 297 frag.hit_strand = frag.query_strand = 0 298 299 hsp = HSP([frag]) 300 hsp.domain_index = int(parsed[0]) 301 hsp.is_included = parsed[1] == '!' 302 hsp.bitscore = float(parsed[2]) 303 hsp.bias = float(parsed[3]) 304 hsp.evalue_cond = float(parsed[4]) 305 hsp.evalue = float(parsed[5]) 306 if self._meta.get('program') == 'hmmscan': 307 # adjust 'from' and 'to' coordinates to 0-based ones 308 hsp.hit_endtype = parsed[8] 309 hsp.query_endtype = parsed[11] 310 elif self._meta.get('program') in ['hmmsearch', 'phmmer']: 311 # adjust 'from' and 'to' coordinates to 0-based ones 312 hsp.hit_endtype = parsed[11] 313 hsp.query_endtype = parsed[8] 314 # adjust 'from' and 'to' coordinates to 0-based ones 315 hsp.env_start = int(parsed[12]) - 1 316 hsp.env_end = int(parsed[13]) 317 hsp.env_endtype = parsed[14] 318 hsp.acc_avg = float(parsed[15]) 319 320 hsp_list.append(hsp) 321 self.line = read_forward(self.handle) 322 323 # parse the hsp alignments 324 if self.line.startswith(' Alignments for each domain:'): 325 self._parse_aln_block(hid, hit.hsps)
326
327 - def _parse_aln_block(self, hid, hsp_list):
328 """Parses a HMMER3 HSP alignment block.""" 329 self.line = read_forward(self.handle) 330 dom_counter = 0 331 while True: 332 if self.line.startswith('>>') or \ 333 self.line.startswith('Internal pipeline'): 334 return hsp_list 335 assert self.line.startswith(' == domain %i' % (dom_counter + 1)) 336 # alias hsp to local var 337 # but note that we're still changing the attrs of the actual 338 # hsp inside the qresult as we're not creating a copy 339 frag = hsp_list[dom_counter][0] 340 # XXX: should we validate again here? regex is expensive.. 341 # regx = re.search(_HRE_VALIDATE, self.line) 342 # assert hsp.bitscore == float(regx.group(1)) 343 # assert hsp.evalue_cond == float(regx.group(2)) 344 hmmseq = '' 345 aliseq = '' 346 annot = {} 347 self.line = self.handle.readline() 348 349 # parse all the alignment blocks in the hsp 350 while True: 351 352 regx = None 353 354 # check for hit or query line 355 # we don't check for the hit or query id specifically 356 # to anticipate special cases where query id == hit id 357 regx = re.search(_HRE_ID_LINE, self.line) 358 if regx: 359 # the first hit/query self.line we encounter is the hmmseq 360 if len(hmmseq) == len(aliseq): 361 hmmseq += regx.group(2) 362 # and for subsequent self.lines, len(hmmseq) is either 363 # > or == len(aliseq) 364 elif len(hmmseq) > len(aliseq): 365 aliseq += regx.group(2) 366 assert len(hmmseq) >= len(aliseq) 367 # check for start of new domain 368 elif self.line.startswith(' == domain') or \ 369 self.line.startswith('>>') or \ 370 self.line.startswith('Internal pipeline'): 371 frag.aln_annotation = annot 372 if self._meta.get('program') == 'hmmscan': 373 frag.hit = hmmseq 374 frag.query = aliseq 375 elif self._meta.get('program') in ['hmmsearch', 'phmmer']: 376 frag.hit = aliseq 377 frag.query = hmmseq 378 dom_counter += 1 379 hmmseq = '' 380 aliseq = '' 381 annot = {} 382 break 383 # otherwise check if it's an annotation line and parse it 384 # len(hmmseq) is only != len(aliseq) when the cursor is parsing 385 # the similarity character. Since we're not parsing that, we 386 # check for when the condition is False (i.e. when it's ==) 387 elif len(hmmseq) == len(aliseq): 388 regx = re.search(_HRE_ANNOT_LINE, self.line) 389 if regx: 390 annot_name = regx.group(3) 391 if annot_name in annot: 392 annot[annot_name] += regx.group(2) 393 else: 394 annot[annot_name] = regx.group(2) 395 396 self.line = self.handle.readline()
397 398
399 -class Hmmer3TextIndexer(_BaseHmmerTextIndexer):
400 """Indexer class for HMMER plain text output.""" 401 402 _parser = Hmmer3TextParser 403 qresult_start = _as_bytes('Query: ') 404 qresult_end = _as_bytes('//') 405
406 - def __iter__(self):
407 handle = self._handle 408 handle.seek(0) 409 start_offset = handle.tell() 410 regex_id = re.compile(_as_bytes(_QRE_ID_LEN_PTN)) 411 412 while True: 413 line = read_forward(handle) 414 end_offset = handle.tell() 415 416 if line.startswith(self.qresult_start): 417 regx = re.search(regex_id, line) 418 qresult_key = regx.group(1).strip() 419 # qresult start offset is the offset of this line 420 # (starts with the start mark) 421 start_offset = end_offset - len(line) 422 elif line.startswith(self.qresult_end): 423 yield _bytes_to_string(qresult_key), start_offset, 0 424 start_offset = end_offset 425 elif not line: 426 break
427 428 429 # if not used as a module, run the doctest 430 if __name__ == "__main__": 431 from Bio._utils import run_doctest 432 run_doctest() 433