Package Bio :: Package SearchIO :: Package HmmerIO :: Module hmmer3_text
[hide private]
[frames] | no frames]

Source Code for Module Bio.SearchIO.HmmerIO.hmmer3_text

  1  # Copyright 2012 by Wibowo Arindrarto.  All rights reserved. 
  2  # This code is part of the Biopython distribution and governed by its 
  3  # license.  Please see the LICENSE file that should have been included 
  4  # as part of this package. 
  5   
  6  """Bio.SearchIO parser for HMMER plain text output format.""" 
  7   
  8  import re 
  9   
 10  from Bio._py3k import _as_bytes, _bytes_to_string 
 11  from Bio._utils import read_forward 
 12  from Bio.Alphabet import generic_protein 
 13  from Bio.SearchIO._model import QueryResult, Hit, HSP, HSPFragment 
 14   
 15  from ._base import _BaseHmmerTextIndexer 
 16   
 17  __all__ = ['Hmmer3TextParser', 'Hmmer3TextIndexer'] 
 18   
 19  __docformat__ = "restructuredtext en" 
 20   
 21   
 22  # precompile regex patterns for faster processing 
 23  # regex for program name capture 
 24  _RE_PROGRAM = re.compile(r'^# (\w*hmm\w+) :: .*$') 
 25  # regex for version string capture 
 26  _RE_VERSION = re.compile(r'# \w+ ([\w+\.]+) .*; http.*$') 
 27  # regex for option string capture 
 28  _RE_OPT = re.compile(r'^# (.+):\s+(.+)$') 
 29  # regex for parsing query id and length, for parsing 
 30  _QRE_ID_LEN_PTN = r'^Query:\s*(.*)\s+\[\w=(\d+)\]' 
 31  _QRE_ID_LEN = re.compile(_QRE_ID_LEN_PTN) 
 32  # regex for hsp validation 
 33  _HRE_VALIDATE = re.compile(r'score:\s(-?\d+\.?\d+)\sbits.*value:\s(.*)') 
 34  # regexes for parsing hsp alignment blocks 
 35  _HRE_ANNOT_LINE = re.compile(r'^(\s+)(.+)\s(\w+)') 
 36  _HRE_ID_LINE = re.compile(r'^(\s+\S+\s+[0-9-]+ )(.+?)(\s+[0-9-]+)') 
 37   
 38   
39 -class Hmmer3TextParser(object):
40 41 """Parser for the HMMER 3.0 text output.""" 42
43 - def __init__(self, handle):
44 self.handle = handle 45 self.line = read_forward(self.handle) 46 self._meta = self._parse_preamble()
47
48 - def __iter__(self):
49 for qresult in self._parse_qresult(): 50 yield qresult
51
52 - def _read_until(self, bool_func):
53 """Reads the file handle until the given function returns True.""" 54 while True: 55 if not self.line or bool_func(self.line): 56 return 57 else: 58 self.line = read_forward(self.handle)
59
60 - def _parse_preamble(self):
61 """Parses HMMER preamble (lines beginning with '#').""" 62 meta = {} 63 # bool flag for storing state ~ whether we are parsing the option 64 # lines or not 65 has_opts = False 66 while True: 67 # no pound sign means we've left the preamble 68 if not self.line.startswith('#'): 69 break 70 # dashes could either mean we are entering or leaving the options 71 # section ~ so it's a switch for the has_opts flag 72 elif '- - -' in self.line: 73 if not has_opts: 74 # if flag is false, that means we're entering opts 75 # so switch the flag accordingly 76 has_opts = True 77 else: 78 # if flag is true, that means we've reached the end of opts 79 # so we can break out of the function 80 break 81 elif not has_opts: 82 # try parsing program 83 regx = re.search(_RE_PROGRAM, self.line) 84 if regx: 85 meta['program'] = regx.group(1) 86 # try parsing version 87 regx = re.search(_RE_VERSION, self.line) 88 if regx: 89 meta['version'] = regx.group(1) 90 elif has_opts: 91 regx = re.search(_RE_OPT, self.line) 92 # if target in regx.group(1), then we store the key as target 93 if 'target' in regx.group(1): 94 meta['target'] = regx.group(2).strip() 95 else: 96 meta[regx.group(1)] = regx.group(2) 97 98 self.line = read_forward(self.handle) 99 100 return meta
101
102 - def _parse_qresult(self):
103 """Parses a HMMER3 query block.""" 104 105 self._read_until(lambda line: line.startswith('Query:')) 106 107 while self.line: 108 109 # get query id and length 110 regx = re.search(_QRE_ID_LEN, self.line) 111 qid = regx.group(1).strip() 112 # store qresult attributes 113 qresult_attrs = { 114 'seq_len': int(regx.group(2)), 115 'program': self._meta.get('program'), 116 'version': self._meta.get('version'), 117 'target': self._meta.get('target'), 118 } 119 120 # get description and accession, if they exist 121 qdesc = '<unknown description>' # placeholder 122 while not self.line.startswith('Scores for '): 123 self.line = read_forward(self.handle) 124 125 if self.line.startswith('Accession:'): 126 acc = self.line.strip().split(' ', 1)[1] 127 qresult_attrs['accession'] = acc.strip() 128 elif self.line.startswith('Description:'): 129 qdesc = self.line.strip().split(' ', 1)[1].strip() 130 qresult_attrs['description'] = qdesc 131 132 # parse the query hits 133 while self.line and '//' not in self.line: 134 hit_list = self._parse_hit(qid, qdesc) 135 # read through the statistics summary 136 # TODO: parse and store this information? 137 if self.line.startswith('Internal pipeline'): 138 while self.line and '//' not in self.line: 139 self.line = read_forward(self.handle) 140 141 # create qresult, set its attributes and yield 142 # not initializing hit_list directly to handle empty hits 143 # (i.e. need to set its query description manually) 144 qresult = QueryResult(id=qid, hits=hit_list) 145 for attr, value in qresult_attrs.items(): 146 setattr(qresult, attr, value) 147 yield qresult 148 self.line = read_forward(self.handle) 149 150 # HMMER >= 3.1 outputs '[ok]' at the end of all results file, 151 # which means we can break the main loop when we see the line 152 if '[ok]' in self.line: 153 break
154
155 - def _parse_hit(self, qid, qdesc):
156 """Parses a HMMER3 hit block, beginning with the hit table.""" 157 # get to the end of the hit table delimiter and read one more line 158 self._read_until(lambda line: 159 line.startswith(' ------- ------ -----')) 160 self.line = read_forward(self.handle) 161 162 # assume every hit is in inclusion threshold until the inclusion 163 # threshold line is encountered 164 is_included = True 165 166 # parse the hit table 167 hit_attr_list = [] 168 while True: 169 if not self.line: 170 return [] 171 elif self.line.startswith(' ------ inclusion'): 172 is_included = False 173 self.line = read_forward(self.handle) 174 # if there are no hits, then there are no hsps 175 # so we forward-read until 'Internal pipeline..' 176 elif self.line.startswith(' [No hits detected that satisfy ' 177 'reporting'): 178 while True: 179 self.line = read_forward(self.handle) 180 if self.line.startswith('Internal pipeline'): 181 assert len(hit_attr_list) == 0 182 return [] 183 elif self.line.startswith('Domain annotation for each '): 184 hit_list = self._create_hits(hit_attr_list, qid, qdesc) 185 return hit_list 186 # entering hit results row 187 # parse the columns into a list 188 row = [x for x in self.line.strip().split(' ') if x] 189 # join the description words if it's >1 word 190 if len(row) > 10: 191 row[9] = ' '.join(row[9:]) 192 # if there's no description, set it to an empty string 193 elif len(row) < 10: 194 row.append('') 195 assert len(row) == 10 196 # create the hit object 197 hit_attrs = { 198 'id': row[8], 199 'query_id': qid, 200 'evalue': float(row[0]), 201 'bitscore': float(row[1]), 202 'bias': float(row[2]), 203 # row[3:6] is not parsed, since the info is available 204 # at the HSP level 205 'domain_exp_num': float(row[6]), 206 'domain_obs_num': int(row[7]), 207 'description': row[9], 208 'is_included': is_included, 209 } 210 hit_attr_list.append(hit_attrs) 211 212 self.line = read_forward(self.handle)
213
214 - def _create_hits(self, hit_attrs, qid, qdesc):
215 """Parses a HMMER3 hsp block, beginning with the hsp table.""" 216 # read through until the beginning of the hsp block 217 self._read_until(lambda line: line.startswith('Internal pipeline') 218 or line.startswith('>>')) 219 220 # start parsing the hsp block 221 hit_list = [] 222 while True: 223 if self.line.startswith('Internal pipeline'): 224 # by this time we should've emptied the hit attr list 225 assert len(hit_attrs) == 0 226 return hit_list 227 assert self.line.startswith('>>') 228 hid, hdesc = self.line[len('>> '):].split(' ', 1) 229 230 # read through the hsp table header and move one more line 231 self._read_until(lambda line: 232 line.startswith(' --- ------ ----- --------') or 233 line.startswith(' [No individual domains')) 234 self.line = read_forward(self.handle) 235 236 # parse the hsp table for the current hit 237 hsp_list = [] 238 while True: 239 # break out of hsp parsing if there are no hits, it's the last hsp 240 # or it's the start of a new hit 241 if self.line.startswith(' [No targets detected that satisfy') or \ 242 self.line.startswith(' [No individual domains') or \ 243 self.line.startswith('Internal pipeline statistics summary:') or \ 244 self.line.startswith(' Alignments for each domain:') or \ 245 self.line.startswith('>>'): 246 247 hit_attr = hit_attrs.pop(0) 248 hit = Hit(hsp_list) 249 for attr, value in hit_attr.items(): 250 setattr(hit, attr, value) 251 if not hit: 252 hit.query_description = qdesc 253 hit_list.append(hit) 254 break 255 256 parsed = [x for x in self.line.strip().split(' ') if x] 257 assert len(parsed) == 16 258 # parsed column order: 259 # index, is_included, bitscore, bias, evalue_cond, evalue 260 # hmmfrom, hmmto, query_ends, hit_ends, alifrom, alito, 261 # envfrom, envto, acc_avg 262 frag = HSPFragment(hid, qid) 263 # HMMER3 alphabets are always protein alphabets 264 frag.alphabet = generic_protein 265 # depending on whether the program is hmmsearch, hmmscan, or phmmer 266 # {hmm,ali}{from,to} can either be hit_{from,to} or query_{from,to} 267 # for hmmscan, hit is the hmm profile, query is the sequence 268 if self._meta.get('program') == 'hmmscan': 269 # adjust 'from' and 'to' coordinates to 0-based ones 270 frag.hit_start = int(parsed[6]) - 1 271 frag.hit_end = int(parsed[7]) 272 frag.query_start = int(parsed[9]) - 1 273 frag.query_end = int(parsed[10]) 274 elif self._meta.get('program') in ['hmmsearch', 'phmmer']: 275 # adjust 'from' and 'to' coordinates to 0-based ones 276 frag.hit_start = int(parsed[9]) - 1 277 frag.hit_end = int(parsed[10]) 278 frag.query_start = int(parsed[6]) - 1 279 frag.query_end = int(parsed[7]) 280 # strand is always 0, since HMMER now only handles protein 281 frag.hit_strand = frag.query_strand = 0 282 283 hsp = HSP([frag]) 284 hsp.domain_index = int(parsed[0]) 285 hsp.is_included = parsed[1] == '!' 286 hsp.bitscore = float(parsed[2]) 287 hsp.bias = float(parsed[3]) 288 hsp.evalue_cond = float(parsed[4]) 289 hsp.evalue = float(parsed[5]) 290 if self._meta.get('program') == 'hmmscan': 291 # adjust 'from' and 'to' coordinates to 0-based ones 292 hsp.hit_endtype = parsed[8] 293 hsp.query_endtype = parsed[11] 294 elif self._meta.get('program') in ['hmmsearch', 'phmmer']: 295 # adjust 'from' and 'to' coordinates to 0-based ones 296 hsp.hit_endtype = parsed[11] 297 hsp.query_endtype = parsed[8] 298 # adjust 'from' and 'to' coordinates to 0-based ones 299 hsp.env_start = int(parsed[12]) - 1 300 hsp.env_end = int(parsed[13]) 301 hsp.env_endtype = parsed[14] 302 hsp.acc_avg = float(parsed[15]) 303 304 hsp_list.append(hsp) 305 self.line = read_forward(self.handle) 306 307 # parse the hsp alignments 308 if self.line.startswith(' Alignments for each domain:'): 309 self._parse_aln_block(hid, hit.hsps)
310
311 - def _parse_aln_block(self, hid, hsp_list):
312 """Parses a HMMER3 HSP alignment block.""" 313 self.line = read_forward(self.handle) 314 dom_counter = 0 315 while True: 316 if self.line.startswith('>>') or \ 317 self.line.startswith('Internal pipeline'): 318 return hsp_list 319 assert self.line.startswith(' == domain %i' % (dom_counter + 1)) 320 # alias hsp to local var 321 # but note that we're still changing the attrs of the actual 322 # hsp inside the qresult as we're not creating a copy 323 frag = hsp_list[dom_counter][0] 324 # XXX: should we validate again here? regex is expensive.. 325 # regx = re.search(_HRE_VALIDATE, self.line) 326 # assert hsp.bitscore == float(regx.group(1)) 327 # assert hsp.evalue_cond == float(regx.group(2)) 328 hmmseq = '' 329 aliseq = '' 330 annot = {} 331 self.line = self.handle.readline() 332 333 # parse all the alignment blocks in the hsp 334 while True: 335 336 regx = None 337 338 # check for hit or query line 339 # we don't check for the hit or query id specifically 340 # to anticipate special cases where query id == hit id 341 regx = re.search(_HRE_ID_LINE, self.line) 342 if regx: 343 # the first hit/query self.line we encounter is the hmmseq 344 if len(hmmseq) == len(aliseq): 345 hmmseq += regx.group(2) 346 # and for subsequent self.lines, len(hmmseq) is either 347 # > or == len(aliseq) 348 elif len(hmmseq) > len(aliseq): 349 aliseq += regx.group(2) 350 assert len(hmmseq) >= len(aliseq) 351 # check for start of new domain 352 elif self.line.startswith(' == domain') or \ 353 self.line.startswith('>>') or \ 354 self.line.startswith('Internal pipeline'): 355 frag.aln_annotation = annot 356 if self._meta.get('program') == 'hmmscan': 357 frag.hit = hmmseq 358 frag.query = aliseq 359 elif self._meta.get('program') in ['hmmsearch', 'phmmer']: 360 frag.hit = aliseq 361 frag.query = hmmseq 362 dom_counter += 1 363 hmmseq = '' 364 aliseq = '' 365 annot = {} 366 break 367 # otherwise check if it's an annotation line and parse it 368 # len(hmmseq) is only != len(aliseq) when the cursor is parsing 369 # the similarity character. Since we're not parsing that, we 370 # check for when the condition is False (i.e. when it's ==) 371 elif len(hmmseq) == len(aliseq): 372 regx = re.search(_HRE_ANNOT_LINE, self.line) 373 if regx: 374 annot_name = regx.group(3) 375 if annot_name in annot: 376 annot[annot_name] += regx.group(2) 377 else: 378 annot[annot_name] = regx.group(2) 379 380 self.line = self.handle.readline()
381 382
383 -class Hmmer3TextIndexer(_BaseHmmerTextIndexer):
384 385 """Indexer class for HMMER plain text output.""" 386 387 _parser = Hmmer3TextParser 388 qresult_start = _as_bytes('Query: ') 389 qresult_end = _as_bytes('//') 390
391 - def __iter__(self):
392 handle = self._handle 393 handle.seek(0) 394 start_offset = handle.tell() 395 regex_id = re.compile(_as_bytes(_QRE_ID_LEN_PTN)) 396 397 while True: 398 line = read_forward(handle) 399 end_offset = handle.tell() 400 401 if line.startswith(self.qresult_start): 402 regx = re.search(regex_id, line) 403 qresult_key = regx.group(1).strip() 404 # qresult start offset is the offset of this line 405 # (starts with the start mark) 406 start_offset = end_offset - len(line) 407 elif line.startswith(self.qresult_end): 408 yield _bytes_to_string(qresult_key), start_offset, 0 409 start_offset = end_offset 410 elif not line: 411 break
412 413 # if not used as a module, run the doctest 414 if __name__ == "__main__": 415 from Bio._utils import run_doctest 416 run_doctest() 417